From 42333a72ed415cc2888d576e39adbee429cb6b8b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 19:07:44 -0400 Subject: [PATCH] feat(health): SiteAuditTelemetryStalledTracker subscribes to EventStream (#23 M6) --- .../SiteAuditTelemetryStalledTracker.cs | 158 ++++++++++++++++++ .../ServiceCollectionExtensions.cs | 11 ++ .../SiteAuditTelemetryStalledTrackerTests.cs | 116 +++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs new file mode 100644 index 0000000..40624e7 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs @@ -0,0 +1,158 @@ +using System.Collections.Concurrent; +using Akka.Actor; +using Akka.Event; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Audit Log (#23) M6 Bundle E (T7) — central singleton that subscribes to the +/// actor system's EventStream for +/// publications and maintains a per-site latched stalled-state map readable +/// via . Consumed by the M6 Bundle E +/// aggregator so the central health +/// surface can surface per-site "reconciliation isn't draining" without +/// coupling the publisher () to the +/// health collection plumbing. +/// +/// +/// +/// Why an internal actor. Akka.NET's only +/// supports subscribers — there is no callback or +/// channel-based overload. The tracker therefore spawns a small subscriber +/// actor that forwards each event into the shared +/// on the actor's thread, and +/// readers () take a copy off that dictionary on any +/// thread. Mirrors the DeadLetterMonitorActor shape — subscribe in +/// , unsubscribe in +/// , which the tracker triggers via a Stop +/// at . +/// +/// +/// Per-site latching. The publisher () +/// only publishes on stalled-state transitions, so the dictionary is the +/// authoritative latched state. Sites that have never published are absent +/// from the snapshot — the consumer surface treats absence as +/// Stalled=false (default healthy), the same default the reconciliation +/// actor's own internal latch uses. +/// +/// +/// Singleton lifecycle. Registered as a singleton via +/// ; +/// tears the internal subscriber down at host shutdown. +/// +/// +public sealed class SiteAuditTelemetryStalledTracker : IDisposable +{ + private readonly EventStream _eventStream; + private readonly ConcurrentDictionary _state = new(); + private readonly IActorRef? _subscriber; + private bool _disposed; + + /// + /// Construct around a bare . Intended for unit + /// tests where the caller wants to publish events without standing up an + /// actor system — the tracker registers a transient subscriber actor only + /// if the supplied stream is backed by an actor system. In the bare-stream + /// mode (no actor system) the tracker still exposes the + /// surface but cannot self-subscribe; production + /// callers always go through . + /// + /// + /// Subscribing to requires an , + /// which can only be created from an . The bare- + /// stream ctor therefore can NOT itself wire the subscriber — tests that + /// want event-driven updates must use the ActorSystem ctor (or push state + /// directly via ). The tests in + /// SiteAuditTelemetryStalledTrackerTests use the ActorSystem ctor + /// via Akka.TestKit so they exercise the production subscribe path. + /// + public SiteAuditTelemetryStalledTracker(EventStream eventStream) + { + _eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream)); + // No subscriber actor — see the remarks above. Apply() is exposed + // internally so the ActorSystem ctor's internal forwarder can update + // state without re-implementing the dictionary write. + _subscriber = null; + } + + /// + /// Production ctor: subscribes a small internal actor to the supplied + /// system's EventStream so every published + /// updates the latched + /// per-site map. tears the subscriber down. + /// + public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem) + { + ArgumentNullException.ThrowIfNull(actorSystem); + _eventStream = actorSystem.EventStream; + // Anonymous subscriber actor scoped to the system; props build it + // with a callback into THIS tracker's Apply method so the actor's + // single-threaded receive serialises every dictionary write. + _subscriber = actorSystem.ActorOf( + Props.Create(() => new StalledChangedSubscriber(this)), + name: $"site-audit-stalled-tracker-{Guid.NewGuid():N}"); + // Subscribe synchronously from the ctor so the subscription is in + // place before the tracker is returned to the caller — the actor's + // own PreStart runs asynchronously and would otherwise race the + // first publish. EventStream.Subscribe is thread-safe. + _eventStream.Subscribe(_subscriber, typeof(SiteAuditTelemetryStalledChanged)); + } + + /// + /// Returns a defensive copy of the per-site latched stalled state. + /// Absent sites are interpreted as Stalled=false by consumers. + /// + public IReadOnlyDictionary Snapshot() => + new Dictionary(_state); + + /// + /// Applied by the internal subscriber actor on every + /// publication. Exposed + /// internally so tests against the bare-stream ctor can still drive the + /// tracker, but the production path always goes through the actor. + /// + internal void Apply(SiteAuditTelemetryStalledChanged evt) + { + if (evt is null) return; + _state[evt.SiteId] = evt.Stalled; + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + if (_subscriber is not null) + { + // Unsubscribe runs in PostStop on the subscriber actor; Stop is + // fire-and-forget but the actor's PostStop hook is guaranteed to + // run before its mailbox is collected. + _subscriber.Tell(PoisonPill.Instance); + } + } + + /// + /// Internal subscriber actor — receives every + /// off the EventStream and + /// forwards it into the parent . + /// Unlike DeadLetterMonitorActor, the subscription is registered by + /// the tracker constructor BEFORE this actor begins processing messages so + /// publishes that arrive between actor creation and PreStart cannot be + /// missed. Unsubscribe still runs in . + /// + private sealed class StalledChangedSubscriber : ReceiveActor + { + private readonly SiteAuditTelemetryStalledTracker _parent; + + public StalledChangedSubscriber(SiteAuditTelemetryStalledTracker parent) + { + _parent = parent; + Receive(evt => _parent.Apply(evt)); + } + + protected override void PostStop() + { + Context.System.EventStream.Unsubscribe(Self, typeof(SiteAuditTelemetryStalledChanged)); + base.PostStop(); + } + } +} diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 7bab904..5cde08b 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -259,6 +259,17 @@ public static class ServiceCollectionExtensions .Bind(config.GetSection(PartitionMaintenanceSectionName)); services.AddHostedService(); + // M6 Bundle E (T7): central-singleton tracker for per-site + // SiteAuditTelemetryStalledChanged events published by + // SiteAuditReconciliationActor. Singleton so the EventStream + // subscription registers exactly once on host startup; the tracker's + // ctor needs an ActorSystem which is composed by the AkkaHostedService + // — we go through a factory so the SP is the source of truth for the + // system reference rather than re-resolving it on the consumer side. + services.AddSingleton(sp => + new SiteAuditTelemetryStalledTracker( + sp.GetRequiredService())); + return services; } } diff --git a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs new file mode 100644 index 0000000..7c375a1 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs @@ -0,0 +1,116 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using ScadaLink.AuditLog.Central; + +namespace ScadaLink.AuditLog.Tests.Central; + +/// +/// Bundle E (M6-T7) tests for . +/// The tracker subscribes to the actor system's EventStream for +/// publications and maintains a +/// per-site latch the central health surface can read. Since reconciliation is +/// central-driven, the "stalled" state semantically belongs to central — not +/// to the per-site +/// payload (which the site itself emits). The tracker therefore lives as a +/// central singleton, not on the site health collector. +/// +public class SiteAuditTelemetryStalledTrackerTests : TestKit +{ + /// + /// Helper: publishes a stalled-changed event on the actor system's + /// EventStream and waits a moment for the tracker's subscribe callback to + /// run. AwaitAssert avoids racing on the stream's async fan-out. + /// + private void PublishAndWait(SiteAuditTelemetryStalledTracker tracker, SiteAuditTelemetryStalledChanged evt) + { + Sys.EventStream.Publish(evt); + AwaitAssert( + () => + { + var snapshot = tracker.Snapshot(); + Assert.True(snapshot.TryGetValue(evt.SiteId, out var stalled), + $"tracker did not record event for {evt.SiteId}"); + Assert.Equal(evt.Stalled, stalled); + }, + duration: TimeSpan.FromSeconds(2), + interval: TimeSpan.FromMilliseconds(20)); + } + + [Fact] + public void Initial_Snapshot_IsEmpty() + { + using var tracker = new SiteAuditTelemetryStalledTracker(Sys); + + var snapshot = tracker.Snapshot(); + + Assert.Empty(snapshot); + } + + [Fact] + public void StalledTrue_Event_TrackerReports_Stalled() + { + using var tracker = new SiteAuditTelemetryStalledTracker(Sys); + + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); + + var snapshot = tracker.Snapshot(); + Assert.True(snapshot["siteA"]); + } + + [Fact] + public void StalledFalse_Event_TrackerReports_NotStalled() + { + using var tracker = new SiteAuditTelemetryStalledTracker(Sys); + + // First flip the site into stalled so the false transition has a + // prior value to overwrite — mirrors how the reconciliation actor + // only publishes false after a true. + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: false)); + + var snapshot = tracker.Snapshot(); + Assert.False(snapshot["siteA"]); + } + + [Fact] + public void Multiple_Sites_Tracked_Independently() + { + using var tracker = new SiteAuditTelemetryStalledTracker(Sys); + + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteB", Stalled: false)); + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteC", Stalled: true)); + + var snapshot = tracker.Snapshot(); + Assert.Equal(3, snapshot.Count); + Assert.True(snapshot["siteA"]); + Assert.False(snapshot["siteB"]); + Assert.True(snapshot["siteC"]); + } + + [Fact] + public void Constructor_With_Null_ActorSystem_Throws() + { + Assert.Throws( + () => new SiteAuditTelemetryStalledTracker((ActorSystem)null!)); + } + + [Fact] + public void Dispose_Unsubscribes_From_EventStream() + { + var tracker = new SiteAuditTelemetryStalledTracker(Sys); + + PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); + + tracker.Dispose(); + + // After dispose any further events are ignored — the snapshot + // reflects the last known state at dispose time. + Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: false)); + + // Give the stream a moment in case the unsubscribe is racey; the + // assertion is that siteA stays at true. + Thread.Sleep(50); + Assert.True(tracker.Snapshot()["siteA"]); + } +}