fix(health): decouple AuditCentralHealthSnapshot from ActorSystem (#23 M6)

The snapshot's per-site stalled latch now lives on the snapshot itself
and is fed by SiteAuditTelemetryStalledTracker via ApplyStalled, removing
the chain that required ActorSystem at DI composition time. The tracker
is now constructed by AkkaHostedService once ActorSystem.Create returns,
with a lock-guarded auxiliary-disposable list so concurrent host
start/stop in tests cannot race the enumeration.
This commit is contained in:
Joseph Doherty
2026-05-20 19:25:28 -04:00
parent 2744011ce9
commit ef49b55cf6
6 changed files with 144 additions and 45 deletions

View File

@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using ScadaLink.AuditLog.Payload;
namespace ScadaLink.AuditLog.Central;
@@ -6,10 +7,10 @@ namespace ScadaLink.AuditLog.Central;
/// Audit Log (#23) M6 Bundle E (T8, T9) — central singleton implementation of
/// <see cref="IAuditCentralHealthSnapshot"/>. Owns thread-safe
/// <see cref="System.Threading.Interlocked"/> counters for
/// <c>CentralAuditWriteFailures</c> + <c>AuditRedactionFailure</c> and
/// delegates <c>SiteAuditTelemetryStalled</c> to the
/// <see cref="SiteAuditTelemetryStalledTracker"/>. Also implements the writer
/// surfaces (<see cref="ICentralAuditWriteFailureCounter"/> +
/// <c>CentralAuditWriteFailures</c> + <c>AuditRedactionFailure</c> and a
/// per-site latched stalled-state map fed by the
/// <see cref="SiteAuditTelemetryStalledTracker"/>. Also implements the
/// writer surfaces (<see cref="ICentralAuditWriteFailureCounter"/> +
/// <see cref="IAuditRedactionFailureCounter"/>) so a single concrete object
/// is the source of truth — DI binds those two interfaces to this same
/// singleton instance on the central composition root.
@@ -25,12 +26,14 @@ namespace ScadaLink.AuditLog.Central;
/// the collector both receives and exposes the metric.
/// </para>
/// <para>
/// <b>Tracker dependency.</b> <see cref="SiteAuditTelemetryStalledTracker"/>
/// is a separate singleton that owns its own actor lifecycle; this snapshot
/// just reads its <see cref="SiteAuditTelemetryStalledTracker.Snapshot"/>
/// surface on each <see cref="SiteAuditTelemetryStalled"/> access. Keeping
/// the tracker as a separate type avoids tangling EventStream subscription
/// state with the simple Interlocked counters here.
/// <b>Stalled-state plumbing.</b> The per-site stalled latch lives directly
/// on this snapshot. <see cref="SiteAuditTelemetryStalledTracker"/> is the
/// EventStream subscriber that pushes
/// <see cref="SiteAuditTelemetryStalledChanged"/> publications in via
/// <see cref="ApplyStalled"/>. Keeping the dictionary on this type (rather
/// than reading the tracker on every access) lets the snapshot be constructed
/// without an <see cref="Akka.Actor.ActorSystem"/> dependency — the tracker
/// is wired up later from the Akka bootstrap, once the system is built.
/// </para>
/// </remarks>
public sealed class AuditCentralHealthSnapshot
@@ -40,13 +43,7 @@ public sealed class AuditCentralHealthSnapshot
{
private int _centralAuditWriteFailures;
private int _auditRedactionFailure;
private readonly SiteAuditTelemetryStalledTracker _stalledTracker;
public AuditCentralHealthSnapshot(SiteAuditTelemetryStalledTracker stalledTracker)
{
_stalledTracker = stalledTracker
?? throw new ArgumentNullException(nameof(stalledTracker));
}
private readonly ConcurrentDictionary<string, bool> _stalled = new();
/// <inheritdoc/>
public int CentralAuditWriteFailures =>
@@ -58,7 +55,20 @@ public sealed class AuditCentralHealthSnapshot
/// <inheritdoc/>
public IReadOnlyDictionary<string, bool> SiteAuditTelemetryStalled =>
_stalledTracker.Snapshot();
new Dictionary<string, bool>(_stalled);
/// <summary>
/// Apply a <see cref="SiteAuditTelemetryStalledChanged"/> publication
/// observed by <see cref="SiteAuditTelemetryStalledTracker"/>. Public
/// so the tracker (which lives in the same assembly but is constructed
/// later from the Akka host) can push without a friend reference;
/// readers should call <see cref="SiteAuditTelemetryStalled"/>.
/// </summary>
public void ApplyStalled(SiteAuditTelemetryStalledChanged evt)
{
if (evt is null) return;
_stalled[evt.SiteId] = evt.Stalled;
}
/// <inheritdoc/>
void ICentralAuditWriteFailureCounter.Increment() =>

View File

@@ -46,6 +46,7 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable
private readonly EventStream _eventStream;
private readonly ConcurrentDictionary<string, bool> _state = new();
private readonly IActorRef? _subscriber;
private readonly AuditCentralHealthSnapshot? _snapshot;
private bool _disposed;
/// <summary>
@@ -67,12 +68,24 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable
/// via Akka.TestKit so they exercise the production subscribe path.
/// </remarks>
public SiteAuditTelemetryStalledTracker(EventStream eventStream)
: this(eventStream, snapshot: null)
{
}
/// <summary>
/// Bare-stream ctor with an optional snapshot sink — the central
/// composition root passes the singleton
/// <see cref="AuditCentralHealthSnapshot"/> so every dictionary update
/// also lands on the central health surface. The bare ctor still cannot
/// subscribe (no actor system), but tests that drive the tracker via
/// <see cref="Apply"/> get the snapshot push for free.
/// </summary>
public SiteAuditTelemetryStalledTracker(EventStream eventStream, AuditCentralHealthSnapshot? snapshot)
{
_eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream));
// No subscriber actor — see the remarks above. Apply() is exposed
// internally so the ActorSystem ctor's internal forwarder can update
// state without re-implementing the dictionary write.
// No subscriber actor — see the remarks on the parameterless overload.
_subscriber = null;
_snapshot = snapshot;
}
/// <summary>
@@ -82,9 +95,21 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable
/// per-site map. <see cref="Dispose"/> tears the subscriber down.
/// </summary>
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem)
: this(actorSystem, snapshot: null)
{
}
/// <summary>
/// Production ctor with a snapshot sink — every observed
/// <see cref="SiteAuditTelemetryStalledChanged"/> is mirrored onto the
/// shared <see cref="AuditCentralHealthSnapshot"/> so the central health
/// surface sees per-site stalled state without re-reading the tracker.
/// </summary>
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem, AuditCentralHealthSnapshot? snapshot)
{
ArgumentNullException.ThrowIfNull(actorSystem);
_eventStream = actorSystem.EventStream;
_snapshot = snapshot;
// Anonymous subscriber actor scoped to the system; props build it
// with a callback into THIS tracker's Apply method so the actor's
// single-threaded receive serialises every dictionary write.
@@ -115,6 +140,11 @@ public sealed class SiteAuditTelemetryStalledTracker : IDisposable
{
if (evt is null) return;
_state[evt.SiteId] = evt.Stalled;
// Mirror into the central health snapshot if wired so a reader of
// IAuditCentralHealthSnapshot sees the same per-site state without
// a second lookup. Snapshot is optional (test composition roots may
// skip it) so the null-coalesce is the safe path.
_snapshot?.ApplyStalled(evt);
}
public void Dispose()

View File

@@ -269,17 +269,6 @@ public static class ServiceCollectionExtensions
.Bind(config.GetSection(PartitionMaintenanceSectionName));
services.AddHostedService<AuditLogPartitionMaintenanceService>();
// M6 Bundle E (T7): central-singleton tracker for per-site
// SiteAuditTelemetryStalledChanged events published by
// SiteAuditReconciliationActor. Singleton so the EventStream
// subscription registers exactly once on host startup; the tracker's
// ctor needs an ActorSystem which is composed by the AkkaHostedService
// — we go through a factory so the SP is the source of truth for the
// system reference rather than re-resolving it on the consumer side.
services.AddSingleton<SiteAuditTelemetryStalledTracker>(sp =>
new SiteAuditTelemetryStalledTracker(
sp.GetRequiredService<Akka.Actor.ActorSystem>()));
// M6 Bundle E (T8 + T9): central health snapshot — a single object
// that owns the CentralAuditWriteFailures + AuditRedactionFailure
// Interlocked counters AND surfaces them on
@@ -289,6 +278,11 @@ public static class ServiceCollectionExtensions
// routes into the shared counters; site nodes keep their existing
// Site bridges (registered by AddAuditLogHealthMetricsBridge) so
// the same counter type does not shadow the site-side metric.
// The snapshot itself has no actor-system dependency — the
// per-site stalled latch is fed by SiteAuditTelemetryStalledTracker
// which the Akka bootstrap wires up after ActorSystem.Create returns
// (the tracker is NOT registered here because its construction
// requires ActorSystem, which is not a DI-resolvable singleton).
services.AddSingleton<AuditCentralHealthSnapshot>();
services.AddSingleton<IAuditCentralHealthSnapshot>(
sp => sp.GetRequiredService<AuditCentralHealthSnapshot>());

View File

@@ -34,6 +34,13 @@ public class AkkaHostedService : IHostedService
private readonly CommunicationOptions _communicationOptions;
private readonly ILogger<AkkaHostedService> _logger;
private ActorSystem? _actorSystem;
/// <summary>
/// Auxiliary IDisposables (e.g. the SiteAuditTelemetryStalledTracker)
/// that this hosted service constructs at start time and must tear down
/// on shutdown — they don't fit the ActorSystem lifecycle but share its
/// process scope.
/// </summary>
private readonly List<IDisposable> _trackedDisposables = new();
public AkkaHostedService(
IServiceProvider serviceProvider,
@@ -201,6 +208,31 @@ akka {{
public async Task StopAsync(CancellationToken cancellationToken)
{
// Dispose auxiliary subscribers (e.g. SiteAuditTelemetryStalledTracker)
// BEFORE Akka shuts down so their EventStream unsubscribe calls run
// while the system is still alive. Per-tracker Dispose is wrapped in
// its own try so a misbehaving subscriber can't sink the shutdown.
// Snapshot the list inside a lock so a concurrent StartAsync (the
// test harness sometimes triggers a second start/stop interleaving)
// can't race the enumeration. Clearing the original list under the
// same lock leaves the next StartAsync with a clean slate.
IDisposable[] disposables;
lock (_trackedDisposables)
{
disposables = _trackedDisposables.ToArray();
_trackedDisposables.Clear();
}
foreach (var disposable in disposables)
{
try { disposable.Dispose(); }
catch (Exception ex)
{
_logger.LogWarning(ex,
"Auxiliary subscriber {Type} threw during shutdown",
disposable.GetType().Name);
}
}
if (_actorSystem != null)
{
_logger.LogInformation("Shutting down Akka.NET actor system via CoordinatedShutdown...");
@@ -349,6 +381,31 @@ akka {{
"AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})",
grpcServer is not null);
// Audit Log (#23) M6 Bundle E (T7): subscribe the per-site stalled
// telemetry tracker to the actor system EventStream NOW that the
// system exists. The tracker mirrors every
// SiteAuditTelemetryStalledChanged publication (from
// SiteAuditReconciliationActor — wired in a later bundle) into the
// AuditCentralHealthSnapshot singleton so the central health surface
// sees per-site stalled state. The tracker is constructed here rather
// than in AddAuditLogCentralMaintenance because its ctor needs an
// ActorSystem, which is not a DI-resolvable singleton — it's owned
// by this hosted service. The snapshot singleton is resolvable;
// passing it in seeds the tracker's Apply() so both internal state
// and the snapshot stay in lock-step.
var auditCentralSnapshot = _serviceProvider
.GetService<ScadaLink.AuditLog.Central.AuditCentralHealthSnapshot>();
if (auditCentralSnapshot is not null)
{
var stalledTracker = new ScadaLink.AuditLog.Central.SiteAuditTelemetryStalledTracker(
_actorSystem!, auditCentralSnapshot);
lock (_trackedDisposables)
{
_trackedDisposables.Add(stalledTracker);
}
_logger.LogInformation("SiteAuditTelemetryStalledTracker subscribed to EventStream");
}
// Site Call Audit (#22) — central singleton mirrors the AuditLogIngest
// and NotificationOutbox patterns. M3's dual-write transaction routes
// SiteCalls upserts through AuditLogIngestActor's own scope-per-message

View File

@@ -24,8 +24,7 @@ public class CentralAuditRedactionFailureCounterTests : TestKit
[Fact]
public void Increment_Routes_To_Snapshot()
{
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
var snapshot = new AuditCentralHealthSnapshot(tracker);
var snapshot = new AuditCentralHealthSnapshot();
var counter = new CentralAuditRedactionFailureCounter(snapshot);
counter.Increment();
@@ -60,10 +59,10 @@ public class CentralAuditRedactionFailureCounterTests : TestKit
var services = new ServiceCollection();
services.AddSingleton<ILoggerFactory, NullLoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(NullLogger<>));
// The AuditCentralHealthSnapshot ctor takes the stalled tracker
// which itself needs an ActorSystem — register a real system
// (test-kit's Sys) so the DI graph composes.
services.AddSingleton<ActorSystem>(Sys);
// AuditCentralHealthSnapshot no longer takes a tracker dependency —
// the tracker is constructed later by the Akka bootstrap because its
// ctor needs an ActorSystem (not a DI-resolvable singleton). The
// snapshot itself composes purely from primitives.
services.AddAuditLog(config);
services.AddAuditLogCentralMaintenance(config);
using var provider = services.BuildServiceProvider();

View File

@@ -115,9 +115,10 @@ public class CentralAuditWriteFailuresTests : TestKit
{
// AuditCentralHealthSnapshot implements both writer surfaces; bumping
// through the writer interfaces is reflected on the read surface, and
// SiteAuditTelemetryStalled is sourced from the injected tracker.
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
var snapshot = new AuditCentralHealthSnapshot(tracker);
// the per-site stalled state is fed in via ApplyStalled — production
// wires that to a SiteAuditTelemetryStalledTracker, but the snapshot
// is testable in isolation against the same Apply surface.
var snapshot = new AuditCentralHealthSnapshot();
Assert.Equal(0, snapshot.CentralAuditWriteFailures);
Assert.Equal(0, snapshot.AuditRedactionFailure);
@@ -127,7 +128,11 @@ public class CentralAuditWriteFailuresTests : TestKit
((ICentralAuditWriteFailureCounter)snapshot).Increment();
((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment();
// Publish a stalled-changed event so the tracker registers a site.
// Wire the tracker so an EventStream publish reaches the snapshot.
// The tracker pushes into the snapshot's ApplyStalled when given
// the snapshot in its ctor; the tracker also keeps its own latch,
// but the snapshot read surface is what the central UI reads.
using var tracker = new SiteAuditTelemetryStalledTracker(Sys, snapshot);
Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
AwaitAssert(() =>
{
@@ -143,9 +148,13 @@ public class CentralAuditWriteFailuresTests : TestKit
}
[Fact]
public void AuditCentralHealthSnapshot_Construction_Without_Tracker_Throws()
public void Snapshot_Empty_OnConstruction()
{
Assert.Throws<ArgumentNullException>(
() => new AuditCentralHealthSnapshot(null!));
// Sanity: the snapshot's three properties start at their zero values
// before any writer or stalled-event publication.
var snapshot = new AuditCentralHealthSnapshot();
Assert.Equal(0, snapshot.CentralAuditWriteFailures);
Assert.Equal(0, snapshot.AuditRedactionFailure);
Assert.Empty(snapshot.SiteAuditTelemetryStalled);
}
}