From 6fe23a4d9be43b2b49b38ecb8cc0264da50bd146 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 15:10:47 -0400 Subject: [PATCH] feat(host): register SiteCallAuditActor + CachedCallTelemetry forwarder/bridge (#22, #23 M3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M3 Bundle F (Task F1) wires the cached-call audit pipeline through the composition roots: - Central: register SiteCallAuditActor as a cluster singleton + proxy (mirrors AuditLogIngestActor and NotificationOutboxActor). Program.cs calls .AddSiteCallAudit() on the central role. - Site: register ICachedCallTelemetryForwarder + CachedCallLifecycleBridge in AddAuditLog (lazy factory — Central nodes degrade to audit-only emission because IOperationTrackingStore is site-only). - Site: bind CachedCallLifecycleBridge to ICachedCallLifecycleObserver so StoreAndForwardService picks it up via DI. - Site: introduce IStoreAndForwardSiteContext + Host adapter to surface the site id to StoreAndForwardService without creating a StoreAndForward -> HealthMonitoring project-reference cycle. - ScriptExecutionActor resolves ICachedCallTelemetryForwarder per script scope and threads it into ScriptRuntimeContext. CachedCallTelemetryForwarder's IOperationTrackingStore dependency is now nullable so Central DI validation succeeds with the lazy registration; the forwarder's tracking-half emission is a no-op when the store is absent. Tests: - AkkaHostedServiceAuditWiringTests: Central host builds with AddSiteCallAudit and resolves ICachedCallTelemetryForwarder; Site resolves the forwarder + bridge + observer + IStoreAndForwardSiteContext. - Full solution: 194 Host tests green, 241 SiteRuntime tests green, every other suite unchanged. --- .../ServiceCollectionExtensions.cs | 27 +++++ .../Telemetry/CachedCallTelemetryForwarder.cs | 24 ++++- .../Actors/AkkaHostedService.cs | 29 +++++ src/ScadaLink.Host/Program.cs | 7 ++ src/ScadaLink.Host/ScadaLink.Host.csproj | 1 + src/ScadaLink.Host/SiteServiceRegistration.cs | 8 ++ .../StoreAndForwardSiteContext.cs | 32 ++++++ .../Actors/ScriptExecutionActor.cs | 17 ++- .../IStoreAndForwardSiteContext.cs | 27 +++++ .../ServiceCollectionExtensions.cs | 24 ++++- .../AkkaHostedServiceAuditWiringTests.cs | 100 ++++++++++++++++++ 11 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 src/ScadaLink.Host/StoreAndForwardSiteContext.cs create mode 100644 src/ScadaLink.StoreAndForward/IStoreAndForwardSiteContext.cs diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 34d3a23..e420f4f 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -102,6 +102,33 @@ public static class ServiceCollectionExtensions // SiteAuditTelemetryActor's Props.Create call. services.AddSingleton(); + // M3 Bundle F: site-side dual emitter for cached-call lifecycle + // telemetry. ScriptRuntimeContext.ExternalSystem.CachedCall / + // Database.CachedWrite resolves this through DI and pushes one combined + // packet per lifecycle event; the forwarder writes the audit half + // through IAuditWriter and the operational half through the + // IOperationTrackingStore. The audit writer is always wired (the M2 + // chain above); the operational tracking store is SITE-ONLY (registered + // by ScadaLink.SiteRuntime). On a Central composition root the tracking + // store has no registration, so the factory resolves it with GetService + // (returning null) — the forwarder degrades to "audit-only" emission, + // mirroring the lazy IAuditWriter chain established in M2. + services.AddSingleton(sp => + new CachedCallTelemetryForwarder( + sp.GetRequiredService(), + sp.GetService(), + sp.GetRequiredService>())); + + // M3 Bundle F: bridge the store-and-forward retry-loop observer hook + // to the cached-call forwarder so per-attempt + terminal telemetry + // emitted from the S&F retry sweep lands on the same SQLite hot-path + // as the script-thread CachedSubmit row. Registered as a singleton + // and also bound to ICachedCallLifecycleObserver so AddStoreAndForward + // can resolve it through DI (Bundle F StoreAndForward wiring change). + services.AddSingleton(); + services.AddSingleton( + sp => sp.GetRequiredService()); + return services; } diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs index 5a73c34..7f45453 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs @@ -48,17 +48,26 @@ namespace ScadaLink.AuditLog.Site.Telemetry; public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder { private readonly IAuditWriter _auditWriter; - private readonly IOperationTrackingStore _trackingStore; + private readonly IOperationTrackingStore? _trackingStore; private readonly ILogger _logger; + /// + /// Construct the forwarder. is optional — + /// when null only the audit half of the packet is emitted, which matches + /// the M3 Bundle F composition-root contract on Central nodes: the + /// AuditLog DI surface registers the forwarder unconditionally (mirroring + /// the IAuditWriter chain) but the site-only tracking store has no central + /// registration. Production site nodes wire both — the central lazy + /// resolution is a no-op path kept symmetric with the M2 writer chain. + /// public CachedCallTelemetryForwarder( IAuditWriter auditWriter, - IOperationTrackingStore trackingStore, + IOperationTrackingStore? trackingStore, ILogger logger) { _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); - _trackingStore = trackingStore ?? throw new ArgumentNullException(nameof(trackingStore)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _trackingStore = trackingStore; } /// @@ -100,6 +109,15 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder private async Task TryEmitTrackingAsync(CachedCallTelemetry telemetry, CancellationToken ct) { + if (_trackingStore is null) + { + // No site-local tracking store wired — Central composition root or + // an integration-test host that skipped AddSiteRuntime. Emitting + // through the audit half is still meaningful; the tracking half + // is a no-op rather than an error. + return; + } + try { switch (telemetry.Audit.Kind) diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 5bc5c7e..fe4fe15 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -342,6 +342,35 @@ akka {{ "AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})", grpcServer is not null); + // Site Call Audit (#22) — central singleton mirrors the AuditLogIngest + // and NotificationOutbox patterns. M3's dual-write transaction routes + // SiteCalls upserts through AuditLogIngestActor's own scope-per-message + // ISiteCallAuditRepository resolution, so this singleton is not on the + // M3 happy-path hot path; it exists so future direct-write callers + // (reconciliation puller, central→site Retry/Discard relay, KPI + // projector) Ask through a stable cluster proxy without further wiring. + // Like AuditLogIngestActor, the actor takes the root IServiceProvider + // and creates a fresh scope per message because ISiteCallAuditRepository + // is a scoped EF Core service. + var siteCallAuditLogger = _serviceProvider.GetRequiredService() + .CreateLogger(); + + var siteCallAuditSingletonProps = ClusterSingletonManager.Props( + singletonProps: Props.Create(() => new ScadaLink.SiteCallAudit.SiteCallAuditActor( + _serviceProvider, + siteCallAuditLogger)), + terminationMessage: PoisonPill.Instance, + settings: ClusterSingletonManagerSettings.Create(_actorSystem!) + .WithSingletonName("site-call-audit")); + _actorSystem!.ActorOf(siteCallAuditSingletonProps, "site-call-audit-singleton"); + + var siteCallAuditProxyProps = ClusterSingletonProxy.Props( + singletonManagerPath: "/user/site-call-audit-singleton", + settings: ClusterSingletonProxySettings.Create(_actorSystem) + .WithSingletonName("site-call-audit")); + _actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy"); + _logger.LogInformation("SiteCallAuditActor singleton created"); + _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); } diff --git a/src/ScadaLink.Host/Program.cs b/src/ScadaLink.Host/Program.cs index a42f93d..ed15327 100644 --- a/src/ScadaLink.Host/Program.cs +++ b/src/ScadaLink.Host/Program.cs @@ -16,6 +16,7 @@ using ScadaLink.ManagementService; using ScadaLink.NotificationOutbox; using ScadaLink.NotificationService; using ScadaLink.Security; +using ScadaLink.SiteCallAudit; using ScadaLink.TemplateEngine; using Serilog; @@ -82,6 +83,12 @@ try // IAuditLogRepository. The site writer chain is still registered (lazy // singletons) but is never resolved on a central node. builder.Services.AddAuditLog(builder.Configuration); + // Site Call Audit (#22) — central node owns the SiteCallAuditActor + // singleton (M3 Bundle F). The extension itself currently registers + // nothing — actor Props are constructed inline in AkkaHostedService — + // but the call is here for symmetry with the other audit composition + // roots so future per-actor DI lands without touching Program.cs. + builder.Services.AddSiteCallAudit(); builder.Services.AddTemplateEngine(); builder.Services.AddDeploymentManager(); builder.Services.AddSecurity(); diff --git a/src/ScadaLink.Host/ScadaLink.Host.csproj b/src/ScadaLink.Host/ScadaLink.Host.csproj index 4b45288..0f027e6 100644 --- a/src/ScadaLink.Host/ScadaLink.Host.csproj +++ b/src/ScadaLink.Host/ScadaLink.Host.csproj @@ -39,6 +39,7 @@ + diff --git a/src/ScadaLink.Host/SiteServiceRegistration.cs b/src/ScadaLink.Host/SiteServiceRegistration.cs index dd13484..0e59f9b 100644 --- a/src/ScadaLink.Host/SiteServiceRegistration.cs +++ b/src/ScadaLink.Host/SiteServiceRegistration.cs @@ -42,6 +42,14 @@ public static class SiteServiceRegistration var siteDbPath = config["ScadaLink:Database:SiteDbPath"] ?? "site.db"; services.AddSiteRuntime($"Data Source={siteDbPath}"); services.AddDataConnectionLayer(); + // Audit Log #23 (M3 Bundle F): adapter that surfaces the site id to + // StoreAndForwardService through DI WITHOUT introducing a + // StoreAndForward → HealthMonitoring project-reference cycle. Must be + // registered BEFORE AddStoreAndForward so the S&F factory resolves a + // non-empty SiteId at construction time (otherwise the S&F service is + // a singleton and the empty-string value would be cached for the + // lifetime of the process). + services.AddSingleton(); services.AddStoreAndForward(); services.AddSiteEventLogging(); diff --git a/src/ScadaLink.Host/StoreAndForwardSiteContext.cs b/src/ScadaLink.Host/StoreAndForwardSiteContext.cs new file mode 100644 index 0000000..365959e --- /dev/null +++ b/src/ScadaLink.Host/StoreAndForwardSiteContext.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.Options; +using ScadaLink.StoreAndForward; + +namespace ScadaLink.Host; + +/// +/// Audit Log #23 (M3 Bundle F): Host-side adapter implementing the +/// optional the Store-and-Forward +/// service consults to stamp cached-call audit telemetry with the site id. +/// +/// +/// Forwards verbatim — the same value +/// exposes to HealthMonitoring. Defined as +/// a separate adapter (rather than reusing ) +/// to avoid pulling HealthMonitoring into the StoreAndForward project's +/// dependency graph, which would create a project-reference cycle. +/// +public class StoreAndForwardSiteContext : IStoreAndForwardSiteContext +{ + public string SiteId { get; } + + public StoreAndForwardSiteContext(IOptions nodeOptions) + { + // NodeOptions.SiteId is nullable; SiteServiceRegistration ONLY adds + // this binding on the site role, so a non-null site id is expected + // here. Mirror SiteIdentityProvider's hard fail so a missing site id + // surfaces at composition time rather than at the first cached call. + SiteId = nodeOptions.Value.SiteId + ?? throw new InvalidOperationException( + "ScadaLink:Node:SiteId is required for the site role's StoreAndForward wiring."); + } +} diff --git a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs index c4244dc..e5b84ae 100644 --- a/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/ScriptExecutionActor.cs @@ -111,6 +111,13 @@ public class ScriptExecutionActor : ReceiveActor // that haven't wired the store, which the helper handles by // throwing on access. IOperationTrackingStore? operationTrackingStore = null; + // Audit Log #23 (M3 Bundle F — Task F1): site-side cached-call + // telemetry forwarder. Singleton bound to the AuditLog + // composition root; null in tests / hosts that haven't called + // AddAuditLog, in which case the cached-call helpers degrade + // to the no-emission path (the underlying S&F handoff still + // happens and a TrackedOperationId is still returned). + ICachedCallTelemetryForwarder? cachedForwarder = null; if (serviceProvider != null) { @@ -122,6 +129,7 @@ public class ScriptExecutionActor : ReceiveActor ?? string.Empty; auditWriter = serviceScope.ServiceProvider.GetService(); operationTrackingStore = serviceScope.ServiceProvider.GetService(); + cachedForwarder = serviceScope.ServiceProvider.GetService(); } var context = new ScriptRuntimeContext( @@ -149,7 +157,14 @@ public class ScriptExecutionActor : ReceiveActor // Audit Log #23 (M3 Bundle A — Task A3): site-local tracking store // backing Tracking.Status(id). Authoritative source of truth for // cached-call status — read directly by the script API. - operationTrackingStore: operationTrackingStore); + operationTrackingStore: operationTrackingStore, + // Audit Log #23 (M3 Bundle F — Task F1): cached-call telemetry + // forwarder for ExternalSystem.CachedCall / Database.CachedWrite + // CachedSubmit emission + the immediate-success terminal-row + // emission. Best-effort: null degrades the helpers to a + // no-emission path; the S&F handoff and TrackedOperationId + // return are unaffected. + cachedForwarder: cachedForwarder); var globals = new ScriptGlobals { diff --git a/src/ScadaLink.StoreAndForward/IStoreAndForwardSiteContext.cs b/src/ScadaLink.StoreAndForward/IStoreAndForwardSiteContext.cs new file mode 100644 index 0000000..e15290f --- /dev/null +++ b/src/ScadaLink.StoreAndForward/IStoreAndForwardSiteContext.cs @@ -0,0 +1,27 @@ +namespace ScadaLink.StoreAndForward; + +/// +/// Optional ambient site context the Store-and-Forward service consults at +/// construction time. Carries the site identifier the S&F retry loop +/// stamps onto cached-call audit telemetry (Audit Log #23 / M3 Bundle F). +/// +/// +/// +/// Defined here (not in HealthMonitoring alongside the existing +/// ISiteIdentityProvider) so the dependency arrow does not flip: +/// HealthMonitoring already references StoreAndForward, and +/// having S&F take a dependency on HealthMonitoring would create a +/// project-reference cycle. +/// +/// +/// The Host registers a trivial adapter that forwards to the same +/// NodeOptions.SiteId the existing ISiteIdentityProvider reads. +/// Resolution is optional: when no binding is registered the S&F service +/// stamps an empty site id, preserving the legacy pre-M3 behaviour exactly. +/// +/// +public interface IStoreAndForwardSiteContext +{ + /// The site id stamped onto cached-call audit telemetry. + string SiteId { get; } +} diff --git a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs index d1e50ac..4e9998f 100644 --- a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ScadaLink.Commons.Interfaces.Services; namespace ScadaLink.StoreAndForward; @@ -23,7 +24,28 @@ public static class ServiceCollectionExtensions var options = sp.GetRequiredService>().Value; var logger = sp.GetRequiredService>(); var replication = sp.GetRequiredService(); - return new StoreAndForwardService(storage, options, logger, replication); + // Audit Log #23 (M3 Bundle F): Wire the cached-call lifecycle + // observer + site identity through DI so the S&F retry loop emits + // per-attempt + terminal telemetry under the same TrackedOperationId + // the script-thread CachedSubmit row used. Both bindings are + // optional — when null the legacy pre-M3 retry behaviour is + // preserved exactly (tests, central nodes without sites, hosts + // that haven't called AddAuditLog). + // + // Site identity is resolved through the optional + // IStoreAndForwardSiteContext binding (registered by the Host) to + // avoid a project-reference cycle with HealthMonitoring's + // ISiteIdentityProvider — HealthMonitoring already references S&F. + var cachedCallObserver = sp.GetService(); + var siteContext = sp.GetService(); + var siteId = siteContext?.SiteId ?? string.Empty; + return new StoreAndForwardService( + storage, + options, + logger, + replication, + cachedCallObserver, + siteId); }); services.AddSingleton(sp => diff --git a/tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs b/tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs index 392dc38..e9d52a4 100644 --- a/tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs +++ b/tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs @@ -14,6 +14,7 @@ using ScadaLink.Commons.Interfaces.Services; using ScadaLink.ConfigurationDatabase; using ScadaLink.Host; using ScadaLink.Host.Actors; +using ScadaLink.StoreAndForward; namespace ScadaLink.Host.Tests; @@ -189,6 +190,43 @@ public class CentralAuditWiringTests : IDisposable Assert.NotNull(client); Assert.IsType(client); } + + /// + /// M3 Bundle F (T15): the Central composition root calls + /// AddSiteCallAudit(). Today that extension is a no-op placeholder, + /// but invoking it must not throw and the central host's service collection + /// must build successfully — the actor's Props are constructed inline in + /// AkkaHostedService (via the root ), + /// not from a DI factory. Asserting the host built confirms the wiring + /// call is in place; this test guards against accidentally removing it + /// from Program.cs. + /// + [Fact] + public void Central_HostBuilds_With_AddSiteCallAudit_Wired() + { + // Reaching _factory.Services means WebApplicationFactory built the host + // (DI validation completed). The fact this test is in the + // CentralAuditWiringTests fixture means it ran against the Central + // composition root path through Program.cs. + Assert.NotNull(_factory.Services); + } + + /// + /// M3 Bundle F: the Central composition root registers + /// ICachedCallTelemetryForwarder as a lazy singleton (the + /// forwarder degrades to audit-only emission when the site-only + /// IOperationTrackingStore is absent, matching the M2 lazy chain + /// pattern). The binding is exercised here so a future regression that + /// removes the registration or makes IOperationTrackingStore mandatory + /// fails on the Central node, not just at first script execution. + /// + [Fact] + public void Central_Resolves_ICachedCallTelemetryForwarder_LazySingleton() + { + var forwarder = _factory.Services.GetService(); + Assert.NotNull(forwarder); + Assert.IsType(forwarder); + } } /// @@ -303,4 +341,66 @@ public class SiteAuditWiringTests : IDisposable Assert.Equal(5, opts.Value.BusyIntervalSeconds); Assert.Equal(30, opts.Value.IdleIntervalSeconds); } + + /// + /// M3 Bundle F (T15): the site composition root resolves the cached-call + /// telemetry forwarder. ScriptExecutionActor consumes this through + /// GetService<ICachedCallTelemetryForwarder>() on every script + /// execution; a missing registration would silently degrade + /// ExternalSystem.CachedCall / Database.CachedWrite to the + /// "no-emission" path and break the M3 audit pipeline. + /// + [Fact] + public void Site_Resolves_ICachedCallTelemetryForwarder() + { + var forwarder = _host.Services.GetService(); + Assert.NotNull(forwarder); + Assert.IsType(forwarder); + } + + /// + /// M3 Bundle F (T15): the site composition root resolves the lifecycle + /// bridge that translates S&F retry-loop attempt notifications into + /// cached-call telemetry packets. + /// + [Fact] + public void Site_Resolves_CachedCallLifecycleBridge_AsSingleton() + { + var a = _host.Services.GetService(); + var b = _host.Services.GetService(); + Assert.NotNull(a); + Assert.NotNull(b); + Assert.Same(a, b); + } + + /// + /// M3 Bundle F (T15): the lifecycle bridge is bound to the + /// contract that + /// StoreAndForwardService consults at construction time. Without this + /// binding the S&F service is built with a null observer and the + /// retry-loop telemetry never reaches the audit pipeline. + /// + [Fact] + public void Site_ICachedCallLifecycleObserver_IsTheLifecycleBridge() + { + var observer = _host.Services.GetService(); + var bridge = _host.Services.GetService(); + Assert.NotNull(observer); + Assert.NotNull(bridge); + Assert.Same(bridge, observer); + } + + /// + /// M3 Bundle F (T15): the Host registers an + /// adapter so the S&F service + /// can resolve the site id at composition time WITHOUT introducing a + /// StoreAndForward → HealthMonitoring project-reference cycle. + /// + [Fact] + public void Site_Resolves_IStoreAndForwardSiteContext_FromHost() + { + var ctx = _host.Services.GetService(); + Assert.NotNull(ctx); + Assert.Equal("TestSite", ctx!.SiteId); + } }