From ef49b55cf6c116d1d523ecca21bc2f5bc6cd9194 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 19:25:28 -0400 Subject: [PATCH] fix(health): decouple AuditCentralHealthSnapshot from ActorSystem (#23 M6) The snapshot's per-site stalled latch now lives on the snapshot itself and is fed by SiteAuditTelemetryStalledTracker via ApplyStalled, removing the chain that required ActorSystem at DI composition time. The tracker is now constructed by AkkaHostedService once ActorSystem.Create returns, with a lock-guarded auxiliary-disposable list so concurrent host start/stop in tests cannot race the enumeration. --- .../Central/AuditCentralHealthSnapshot.cs | 46 +++++++++------ .../SiteAuditTelemetryStalledTracker.cs | 36 +++++++++++- .../ServiceCollectionExtensions.cs | 16 ++---- .../Actors/AkkaHostedService.cs | 57 +++++++++++++++++++ ...entralAuditRedactionFailureCounterTests.cs | 11 ++-- .../Central/CentralAuditWriteFailuresTests.cs | 23 +++++--- 6 files changed, 144 insertions(+), 45 deletions(-) diff --git a/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs index c44453b..e728c51 100644 --- a/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs +++ b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using ScadaLink.AuditLog.Payload; namespace ScadaLink.AuditLog.Central; @@ -6,10 +7,10 @@ namespace ScadaLink.AuditLog.Central; /// Audit Log (#23) M6 Bundle E (T8, T9) — central singleton implementation of /// . Owns thread-safe /// counters for -/// CentralAuditWriteFailures + AuditRedactionFailure and -/// delegates SiteAuditTelemetryStalled to the -/// . Also implements the writer -/// surfaces ( + +/// CentralAuditWriteFailures + AuditRedactionFailure and a +/// per-site latched stalled-state map fed by the +/// . Also implements the +/// writer surfaces ( + /// ) so a single concrete object /// is the source of truth — DI binds those two interfaces to this same /// singleton instance on the central composition root. @@ -25,12 +26,14 @@ namespace ScadaLink.AuditLog.Central; /// the collector both receives and exposes the metric. /// /// -/// Tracker dependency. -/// is a separate singleton that owns its own actor lifecycle; this snapshot -/// just reads its -/// surface on each access. Keeping -/// the tracker as a separate type avoids tangling EventStream subscription -/// state with the simple Interlocked counters here. +/// Stalled-state plumbing. The per-site stalled latch lives directly +/// on this snapshot. is the +/// EventStream subscriber that pushes +/// publications in via +/// . Keeping the dictionary on this type (rather +/// than reading the tracker on every access) lets the snapshot be constructed +/// without an dependency — the tracker +/// is wired up later from the Akka bootstrap, once the system is built. /// /// public sealed class AuditCentralHealthSnapshot @@ -40,13 +43,7 @@ public sealed class AuditCentralHealthSnapshot { private int _centralAuditWriteFailures; private int _auditRedactionFailure; - private readonly SiteAuditTelemetryStalledTracker _stalledTracker; - - public AuditCentralHealthSnapshot(SiteAuditTelemetryStalledTracker stalledTracker) - { - _stalledTracker = stalledTracker - ?? throw new ArgumentNullException(nameof(stalledTracker)); - } + private readonly ConcurrentDictionary _stalled = new(); /// public int CentralAuditWriteFailures => @@ -58,7 +55,20 @@ public sealed class AuditCentralHealthSnapshot /// public IReadOnlyDictionary SiteAuditTelemetryStalled => - _stalledTracker.Snapshot(); + new Dictionary(_stalled); + + /// + /// Apply a publication + /// observed by . Public + /// so the tracker (which lives in the same assembly but is constructed + /// later from the Akka host) can push without a friend reference; + /// readers should call . + /// + public void ApplyStalled(SiteAuditTelemetryStalledChanged evt) + { + if (evt is null) return; + _stalled[evt.SiteId] = evt.Stalled; + } /// void ICentralAuditWriteFailureCounter.Increment() => diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs index 40624e7..e1ed0fd 100644 --- a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs +++ b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs @@ -46,6 +46,7 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable private readonly EventStream _eventStream; private readonly ConcurrentDictionary _state = new(); private readonly IActorRef? _subscriber; + private readonly AuditCentralHealthSnapshot? _snapshot; private bool _disposed; /// @@ -67,12 +68,24 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable /// via Akka.TestKit so they exercise the production subscribe path. /// public SiteAuditTelemetryStalledTracker(EventStream eventStream) + : this(eventStream, snapshot: null) + { + } + + /// + /// Bare-stream ctor with an optional snapshot sink — the central + /// composition root passes the singleton + /// so every dictionary update + /// also lands on the central health surface. The bare ctor still cannot + /// subscribe (no actor system), but tests that drive the tracker via + /// get the snapshot push for free. + /// + public SiteAuditTelemetryStalledTracker(EventStream eventStream, AuditCentralHealthSnapshot? snapshot) { _eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream)); - // No subscriber actor — see the remarks above. Apply() is exposed - // internally so the ActorSystem ctor's internal forwarder can update - // state without re-implementing the dictionary write. + // No subscriber actor — see the remarks on the parameterless overload. _subscriber = null; + _snapshot = snapshot; } /// @@ -82,9 +95,21 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable /// per-site map. tears the subscriber down. /// public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem) + : this(actorSystem, snapshot: null) + { + } + + /// + /// Production ctor with a snapshot sink — every observed + /// is mirrored onto the + /// shared so the central health + /// surface sees per-site stalled state without re-reading the tracker. + /// + public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem, AuditCentralHealthSnapshot? snapshot) { ArgumentNullException.ThrowIfNull(actorSystem); _eventStream = actorSystem.EventStream; + _snapshot = snapshot; // Anonymous subscriber actor scoped to the system; props build it // with a callback into THIS tracker's Apply method so the actor's // single-threaded receive serialises every dictionary write. @@ -115,6 +140,11 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable { if (evt is null) return; _state[evt.SiteId] = evt.Stalled; + // Mirror into the central health snapshot if wired so a reader of + // IAuditCentralHealthSnapshot sees the same per-site state without + // a second lookup. Snapshot is optional (test composition roots may + // skip it) so the null-coalesce is the safe path. + _snapshot?.ApplyStalled(evt); } public void Dispose() diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 6460b1a..626859f 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -269,17 +269,6 @@ public static class ServiceCollectionExtensions .Bind(config.GetSection(PartitionMaintenanceSectionName)); services.AddHostedService(); - // M6 Bundle E (T7): central-singleton tracker for per-site - // SiteAuditTelemetryStalledChanged events published by - // SiteAuditReconciliationActor. Singleton so the EventStream - // subscription registers exactly once on host startup; the tracker's - // ctor needs an ActorSystem which is composed by the AkkaHostedService - // — we go through a factory so the SP is the source of truth for the - // system reference rather than re-resolving it on the consumer side. - services.AddSingleton(sp => - new SiteAuditTelemetryStalledTracker( - sp.GetRequiredService())); - // M6 Bundle E (T8 + T9): central health snapshot — a single object // that owns the CentralAuditWriteFailures + AuditRedactionFailure // Interlocked counters AND surfaces them on @@ -289,6 +278,11 @@ public static class ServiceCollectionExtensions // routes into the shared counters; site nodes keep their existing // Site bridges (registered by AddAuditLogHealthMetricsBridge) so // the same counter type does not shadow the site-side metric. + // The snapshot itself has no actor-system dependency — the + // per-site stalled latch is fed by SiteAuditTelemetryStalledTracker + // which the Akka bootstrap wires up after ActorSystem.Create returns + // (the tracker is NOT registered here because its construction + // requires ActorSystem, which is not a DI-resolvable singleton). services.AddSingleton(); services.AddSingleton( sp => sp.GetRequiredService()); diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 9425368..dce065a 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -34,6 +34,13 @@ public class AkkaHostedService : IHostedService private readonly CommunicationOptions _communicationOptions; private readonly ILogger _logger; private ActorSystem? _actorSystem; + /// + /// Auxiliary IDisposables (e.g. the SiteAuditTelemetryStalledTracker) + /// that this hosted service constructs at start time and must tear down + /// on shutdown — they don't fit the ActorSystem lifecycle but share its + /// process scope. + /// + private readonly List _trackedDisposables = new(); public AkkaHostedService( IServiceProvider serviceProvider, @@ -201,6 +208,31 @@ akka {{ public async Task StopAsync(CancellationToken cancellationToken) { + // Dispose auxiliary subscribers (e.g. SiteAuditTelemetryStalledTracker) + // BEFORE Akka shuts down so their EventStream unsubscribe calls run + // while the system is still alive. Per-tracker Dispose is wrapped in + // its own try so a misbehaving subscriber can't sink the shutdown. + // Snapshot the list inside a lock so a concurrent StartAsync (the + // test harness sometimes triggers a second start/stop interleaving) + // can't race the enumeration. Clearing the original list under the + // same lock leaves the next StartAsync with a clean slate. + IDisposable[] disposables; + lock (_trackedDisposables) + { + disposables = _trackedDisposables.ToArray(); + _trackedDisposables.Clear(); + } + foreach (var disposable in disposables) + { + try { disposable.Dispose(); } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Auxiliary subscriber {Type} threw during shutdown", + disposable.GetType().Name); + } + } + if (_actorSystem != null) { _logger.LogInformation("Shutting down Akka.NET actor system via CoordinatedShutdown..."); @@ -349,6 +381,31 @@ akka {{ "AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})", grpcServer is not null); + // Audit Log (#23) M6 Bundle E (T7): subscribe the per-site stalled + // telemetry tracker to the actor system EventStream NOW that the + // system exists. The tracker mirrors every + // SiteAuditTelemetryStalledChanged publication (from + // SiteAuditReconciliationActor — wired in a later bundle) into the + // AuditCentralHealthSnapshot singleton so the central health surface + // sees per-site stalled state. The tracker is constructed here rather + // than in AddAuditLogCentralMaintenance because its ctor needs an + // ActorSystem, which is not a DI-resolvable singleton — it's owned + // by this hosted service. The snapshot singleton is resolvable; + // passing it in seeds the tracker's Apply() so both internal state + // and the snapshot stay in lock-step. + var auditCentralSnapshot = _serviceProvider + .GetService(); + if (auditCentralSnapshot is not null) + { + var stalledTracker = new ScadaLink.AuditLog.Central.SiteAuditTelemetryStalledTracker( + _actorSystem!, auditCentralSnapshot); + lock (_trackedDisposables) + { + _trackedDisposables.Add(stalledTracker); + } + _logger.LogInformation("SiteAuditTelemetryStalledTracker subscribed to EventStream"); + } + // Site Call Audit (#22) — central singleton mirrors the AuditLogIngest // and NotificationOutbox patterns. M3's dual-write transaction routes // SiteCalls upserts through AuditLogIngestActor's own scope-per-message diff --git a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs index 7fbfebf..795841b 100644 --- a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs @@ -24,8 +24,7 @@ public class CentralAuditRedactionFailureCounterTests : TestKit [Fact] public void Increment_Routes_To_Snapshot() { - using var tracker = new SiteAuditTelemetryStalledTracker(Sys); - var snapshot = new AuditCentralHealthSnapshot(tracker); + var snapshot = new AuditCentralHealthSnapshot(); var counter = new CentralAuditRedactionFailureCounter(snapshot); counter.Increment(); @@ -60,10 +59,10 @@ public class CentralAuditRedactionFailureCounterTests : TestKit var services = new ServiceCollection(); services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(NullLogger<>)); - // The AuditCentralHealthSnapshot ctor takes the stalled tracker - // which itself needs an ActorSystem — register a real system - // (test-kit's Sys) so the DI graph composes. - services.AddSingleton(Sys); + // AuditCentralHealthSnapshot no longer takes a tracker dependency — + // the tracker is constructed later by the Akka bootstrap because its + // ctor needs an ActorSystem (not a DI-resolvable singleton). The + // snapshot itself composes purely from primitives. services.AddAuditLog(config); services.AddAuditLogCentralMaintenance(config); using var provider = services.BuildServiceProvider(); diff --git a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs index 0383e09..32b0a9a 100644 --- a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs @@ -115,9 +115,10 @@ public class CentralAuditWriteFailuresTests : TestKit { // AuditCentralHealthSnapshot implements both writer surfaces; bumping // through the writer interfaces is reflected on the read surface, and - // SiteAuditTelemetryStalled is sourced from the injected tracker. - using var tracker = new SiteAuditTelemetryStalledTracker(Sys); - var snapshot = new AuditCentralHealthSnapshot(tracker); + // the per-site stalled state is fed in via ApplyStalled — production + // wires that to a SiteAuditTelemetryStalledTracker, but the snapshot + // is testable in isolation against the same Apply surface. + var snapshot = new AuditCentralHealthSnapshot(); Assert.Equal(0, snapshot.CentralAuditWriteFailures); Assert.Equal(0, snapshot.AuditRedactionFailure); @@ -127,7 +128,11 @@ public class CentralAuditWriteFailuresTests : TestKit ((ICentralAuditWriteFailureCounter)snapshot).Increment(); ((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment(); - // Publish a stalled-changed event so the tracker registers a site. + // Wire the tracker so an EventStream publish reaches the snapshot. + // The tracker pushes into the snapshot's ApplyStalled when given + // the snapshot in its ctor; the tracker also keeps its own latch, + // but the snapshot read surface is what the central UI reads. + using var tracker = new SiteAuditTelemetryStalledTracker(Sys, snapshot); Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); AwaitAssert(() => { @@ -143,9 +148,13 @@ public class CentralAuditWriteFailuresTests : TestKit } [Fact] - public void AuditCentralHealthSnapshot_Construction_Without_Tracker_Throws() + public void Snapshot_Empty_OnConstruction() { - Assert.Throws( - () => new AuditCentralHealthSnapshot(null!)); + // Sanity: the snapshot's three properties start at their zero values + // before any writer or stalled-event publication. + var snapshot = new AuditCentralHealthSnapshot(); + Assert.Equal(0, snapshot.CentralAuditWriteFailures); + Assert.Equal(0, snapshot.AuditRedactionFailure); + Assert.Empty(snapshot.SiteAuditTelemetryStalled); } }