diff --git a/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs index c44453b..e728c51 100644 --- a/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs +++ b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using ScadaLink.AuditLog.Payload; namespace ScadaLink.AuditLog.Central; @@ -6,10 +7,10 @@ namespace ScadaLink.AuditLog.Central; /// Audit Log (#23) M6 Bundle E (T8, T9) — central singleton implementation of /// . Owns thread-safe /// counters for -/// CentralAuditWriteFailures + AuditRedactionFailure and -/// delegates SiteAuditTelemetryStalled to the -/// . Also implements the writer -/// surfaces ( + +/// CentralAuditWriteFailures + AuditRedactionFailure and a +/// per-site latched stalled-state map fed by the +/// . Also implements the +/// writer surfaces ( + /// ) so a single concrete object /// is the source of truth — DI binds those two interfaces to this same /// singleton instance on the central composition root. @@ -25,12 +26,14 @@ namespace ScadaLink.AuditLog.Central; /// the collector both receives and exposes the metric. /// /// -/// Tracker dependency. -/// is a separate singleton that owns its own actor lifecycle; this snapshot -/// just reads its -/// surface on each access. Keeping -/// the tracker as a separate type avoids tangling EventStream subscription -/// state with the simple Interlocked counters here. +/// Stalled-state plumbing. The per-site stalled latch lives directly +/// on this snapshot. is the +/// EventStream subscriber that pushes +/// publications in via +/// . Keeping the dictionary on this type (rather +/// than reading the tracker on every access) lets the snapshot be constructed +/// without an dependency — the tracker +/// is wired up later from the Akka bootstrap, once the system is built. /// /// public sealed class AuditCentralHealthSnapshot @@ -40,13 +43,7 @@ public sealed class AuditCentralHealthSnapshot { private int _centralAuditWriteFailures; private int _auditRedactionFailure; - private readonly SiteAuditTelemetryStalledTracker _stalledTracker; - - public AuditCentralHealthSnapshot(SiteAuditTelemetryStalledTracker stalledTracker) - { - _stalledTracker = stalledTracker - ?? throw new ArgumentNullException(nameof(stalledTracker)); - } + private readonly ConcurrentDictionary _stalled = new(); /// public int CentralAuditWriteFailures => @@ -58,7 +55,20 @@ public sealed class AuditCentralHealthSnapshot /// public IReadOnlyDictionary SiteAuditTelemetryStalled => - _stalledTracker.Snapshot(); + new Dictionary(_stalled); + + /// + /// Apply a publication + /// observed by . Public + /// so the tracker (which lives in the same assembly but is constructed + /// later from the Akka host) can push without a friend reference; + /// readers should call . + /// + public void ApplyStalled(SiteAuditTelemetryStalledChanged evt) + { + if (evt is null) return; + _stalled[evt.SiteId] = evt.Stalled; + } /// void ICentralAuditWriteFailureCounter.Increment() => diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs index 40624e7..e1ed0fd 100644 --- a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs +++ b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs @@ -46,6 +46,7 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable private readonly EventStream _eventStream; private readonly ConcurrentDictionary _state = new(); private readonly IActorRef? _subscriber; + private readonly AuditCentralHealthSnapshot? _snapshot; private bool _disposed; /// @@ -67,12 +68,24 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable /// via Akka.TestKit so they exercise the production subscribe path. /// public SiteAuditTelemetryStalledTracker(EventStream eventStream) + : this(eventStream, snapshot: null) + { + } + + /// + /// Bare-stream ctor with an optional snapshot sink — the central + /// composition root passes the singleton + /// so every dictionary update + /// also lands on the central health surface. The bare ctor still cannot + /// subscribe (no actor system), but tests that drive the tracker via + /// get the snapshot push for free. + /// + public SiteAuditTelemetryStalledTracker(EventStream eventStream, AuditCentralHealthSnapshot? snapshot) { _eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream)); - // No subscriber actor — see the remarks above. Apply() is exposed - // internally so the ActorSystem ctor's internal forwarder can update - // state without re-implementing the dictionary write. + // No subscriber actor — see the remarks on the parameterless overload. _subscriber = null; + _snapshot = snapshot; } /// @@ -82,9 +95,21 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable /// per-site map. tears the subscriber down. /// public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem) + : this(actorSystem, snapshot: null) + { + } + + /// + /// Production ctor with a snapshot sink — every observed + /// is mirrored onto the + /// shared so the central health + /// surface sees per-site stalled state without re-reading the tracker. + /// + public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem, AuditCentralHealthSnapshot? snapshot) { ArgumentNullException.ThrowIfNull(actorSystem); _eventStream = actorSystem.EventStream; + _snapshot = snapshot; // Anonymous subscriber actor scoped to the system; props build it // with a callback into THIS tracker's Apply method so the actor's // single-threaded receive serialises every dictionary write. @@ -115,6 +140,11 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable { if (evt is null) return; _state[evt.SiteId] = evt.Stalled; + // Mirror into the central health snapshot if wired so a reader of + // IAuditCentralHealthSnapshot sees the same per-site state without + // a second lookup. Snapshot is optional (test composition roots may + // skip it) so the null-coalesce is the safe path. + _snapshot?.ApplyStalled(evt); } public void Dispose() diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 6460b1a..626859f 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -269,17 +269,6 @@ public static class ServiceCollectionExtensions .Bind(config.GetSection(PartitionMaintenanceSectionName)); services.AddHostedService(); - // M6 Bundle E (T7): central-singleton tracker for per-site - // SiteAuditTelemetryStalledChanged events published by - // SiteAuditReconciliationActor. Singleton so the EventStream - // subscription registers exactly once on host startup; the tracker's - // ctor needs an ActorSystem which is composed by the AkkaHostedService - // — we go through a factory so the SP is the source of truth for the - // system reference rather than re-resolving it on the consumer side. - services.AddSingleton(sp => - new SiteAuditTelemetryStalledTracker( - sp.GetRequiredService())); - // M6 Bundle E (T8 + T9): central health snapshot — a single object // that owns the CentralAuditWriteFailures + AuditRedactionFailure // Interlocked counters AND surfaces them on @@ -289,6 +278,11 @@ public static class ServiceCollectionExtensions // routes into the shared counters; site nodes keep their existing // Site bridges (registered by AddAuditLogHealthMetricsBridge) so // the same counter type does not shadow the site-side metric. + // The snapshot itself has no actor-system dependency — the + // per-site stalled latch is fed by SiteAuditTelemetryStalledTracker + // which the Akka bootstrap wires up after ActorSystem.Create returns + // (the tracker is NOT registered here because its construction + // requires ActorSystem, which is not a DI-resolvable singleton). services.AddSingleton(); services.AddSingleton( sp => sp.GetRequiredService()); diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 9425368..dce065a 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -34,6 +34,13 @@ public class AkkaHostedService : IHostedService private readonly CommunicationOptions _communicationOptions; private readonly ILogger _logger; private ActorSystem? _actorSystem; + /// + /// Auxiliary IDisposables (e.g. the SiteAuditTelemetryStalledTracker) + /// that this hosted service constructs at start time and must tear down + /// on shutdown — they don't fit the ActorSystem lifecycle but share its + /// process scope. + /// + private readonly List _trackedDisposables = new(); public AkkaHostedService( IServiceProvider serviceProvider, @@ -201,6 +208,31 @@ akka {{ public async Task StopAsync(CancellationToken cancellationToken) { + // Dispose auxiliary subscribers (e.g. SiteAuditTelemetryStalledTracker) + // BEFORE Akka shuts down so their EventStream unsubscribe calls run + // while the system is still alive. Per-tracker Dispose is wrapped in + // its own try so a misbehaving subscriber can't sink the shutdown. + // Snapshot the list inside a lock so a concurrent StartAsync (the + // test harness sometimes triggers a second start/stop interleaving) + // can't race the enumeration. Clearing the original list under the + // same lock leaves the next StartAsync with a clean slate. + IDisposable[] disposables; + lock (_trackedDisposables) + { + disposables = _trackedDisposables.ToArray(); + _trackedDisposables.Clear(); + } + foreach (var disposable in disposables) + { + try { disposable.Dispose(); } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Auxiliary subscriber {Type} threw during shutdown", + disposable.GetType().Name); + } + } + if (_actorSystem != null) { _logger.LogInformation("Shutting down Akka.NET actor system via CoordinatedShutdown..."); @@ -349,6 +381,31 @@ akka {{ "AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})", grpcServer is not null); + // Audit Log (#23) M6 Bundle E (T7): subscribe the per-site stalled + // telemetry tracker to the actor system EventStream NOW that the + // system exists. The tracker mirrors every + // SiteAuditTelemetryStalledChanged publication (from + // SiteAuditReconciliationActor — wired in a later bundle) into the + // AuditCentralHealthSnapshot singleton so the central health surface + // sees per-site stalled state. The tracker is constructed here rather + // than in AddAuditLogCentralMaintenance because its ctor needs an + // ActorSystem, which is not a DI-resolvable singleton — it's owned + // by this hosted service. The snapshot singleton is resolvable; + // passing it in seeds the tracker's Apply() so both internal state + // and the snapshot stay in lock-step. + var auditCentralSnapshot = _serviceProvider + .GetService(); + if (auditCentralSnapshot is not null) + { + var stalledTracker = new ScadaLink.AuditLog.Central.SiteAuditTelemetryStalledTracker( + _actorSystem!, auditCentralSnapshot); + lock (_trackedDisposables) + { + _trackedDisposables.Add(stalledTracker); + } + _logger.LogInformation("SiteAuditTelemetryStalledTracker subscribed to EventStream"); + } + // Site Call Audit (#22) — central singleton mirrors the AuditLogIngest // and NotificationOutbox patterns. M3's dual-write transaction routes // SiteCalls upserts through AuditLogIngestActor's own scope-per-message diff --git a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs index 7fbfebf..795841b 100644 --- a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditRedactionFailureCounterTests.cs @@ -24,8 +24,7 @@ public class CentralAuditRedactionFailureCounterTests : TestKit [Fact] public void Increment_Routes_To_Snapshot() { - using var tracker = new SiteAuditTelemetryStalledTracker(Sys); - var snapshot = new AuditCentralHealthSnapshot(tracker); + var snapshot = new AuditCentralHealthSnapshot(); var counter = new CentralAuditRedactionFailureCounter(snapshot); counter.Increment(); @@ -60,10 +59,10 @@ public class CentralAuditRedactionFailureCounterTests : TestKit var services = new ServiceCollection(); services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(NullLogger<>)); - // The AuditCentralHealthSnapshot ctor takes the stalled tracker - // which itself needs an ActorSystem — register a real system - // (test-kit's Sys) so the DI graph composes. - services.AddSingleton(Sys); + // AuditCentralHealthSnapshot no longer takes a tracker dependency — + // the tracker is constructed later by the Akka bootstrap because its + // ctor needs an ActorSystem (not a DI-resolvable singleton). The + // snapshot itself composes purely from primitives. services.AddAuditLog(config); services.AddAuditLogCentralMaintenance(config); using var provider = services.BuildServiceProvider(); diff --git a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs index 0383e09..32b0a9a 100644 --- a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs @@ -115,9 +115,10 @@ public class CentralAuditWriteFailuresTests : TestKit { // AuditCentralHealthSnapshot implements both writer surfaces; bumping // through the writer interfaces is reflected on the read surface, and - // SiteAuditTelemetryStalled is sourced from the injected tracker. - using var tracker = new SiteAuditTelemetryStalledTracker(Sys); - var snapshot = new AuditCentralHealthSnapshot(tracker); + // the per-site stalled state is fed in via ApplyStalled — production + // wires that to a SiteAuditTelemetryStalledTracker, but the snapshot + // is testable in isolation against the same Apply surface. + var snapshot = new AuditCentralHealthSnapshot(); Assert.Equal(0, snapshot.CentralAuditWriteFailures); Assert.Equal(0, snapshot.AuditRedactionFailure); @@ -127,7 +128,11 @@ public class CentralAuditWriteFailuresTests : TestKit ((ICentralAuditWriteFailureCounter)snapshot).Increment(); ((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment(); - // Publish a stalled-changed event so the tracker registers a site. + // Wire the tracker so an EventStream publish reaches the snapshot. + // The tracker pushes into the snapshot's ApplyStalled when given + // the snapshot in its ctor; the tracker also keeps its own latch, + // but the snapshot read surface is what the central UI reads. + using var tracker = new SiteAuditTelemetryStalledTracker(Sys, snapshot); Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); AwaitAssert(() => { @@ -143,9 +148,13 @@ public class CentralAuditWriteFailuresTests : TestKit } [Fact] - public void AuditCentralHealthSnapshot_Construction_Without_Tracker_Throws() + public void Snapshot_Empty_OnConstruction() { - Assert.Throws( - () => new AuditCentralHealthSnapshot(null!)); + // Sanity: the snapshot's three properties start at their zero values + // before any writer or stalled-event publication. + var snapshot = new AuditCentralHealthSnapshot(); + Assert.Equal(0, snapshot.CentralAuditWriteFailures); + Assert.Equal(0, snapshot.AuditRedactionFailure); + Assert.Empty(snapshot.SiteAuditTelemetryStalled); } }