diff --git a/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs
new file mode 100644
index 0000000..40624e7
--- /dev/null
+++ b/src/ScadaLink.AuditLog/Central/SiteAuditTelemetryStalledTracker.cs
@@ -0,0 +1,158 @@
+using System.Collections.Concurrent;
+using Akka.Actor;
+using Akka.Event;
+
+namespace ScadaLink.AuditLog.Central;
+
+///
+/// Audit Log (#23) M6 Bundle E (T7) — central singleton that subscribes to the
+/// actor system's EventStream for
+/// publications and maintains a per-site latched stalled-state map readable
+/// via . Consumed by the M6 Bundle E
+/// aggregator so the central health
+/// surface can surface per-site "reconciliation isn't draining" without
+/// coupling the publisher () to the
+/// health collection plumbing.
+///
+///
+///
+/// Why an internal actor. Akka.NET's only
+/// supports subscribers — there is no callback or
+/// channel-based overload. The tracker therefore spawns a small subscriber
+/// actor that forwards each event into the shared
+/// on the actor's thread, and
+/// readers () take a copy off that dictionary on any
+/// thread. Mirrors the DeadLetterMonitorActor shape — subscribe in
+/// , unsubscribe in
+/// , which the tracker triggers via a Stop
+/// at .
+///
+///
+/// Per-site latching. The publisher ()
+/// only publishes on stalled-state transitions, so the dictionary is the
+/// authoritative latched state. Sites that have never published are absent
+/// from the snapshot — the consumer surface treats absence as
+/// Stalled=false (default healthy), the same default the reconciliation
+/// actor's own internal latch uses.
+///
+///
+/// Singleton lifecycle. Registered as a singleton via
+/// ;
+/// tears the internal subscriber down at host shutdown.
+///
+///
+public sealed class SiteAuditTelemetryStalledTracker : IDisposable
+{
+ private readonly EventStream _eventStream;
+ private readonly ConcurrentDictionary _state = new();
+ private readonly IActorRef? _subscriber;
+ private bool _disposed;
+
+ ///
+ /// Construct around a bare . Intended for unit
+ /// tests where the caller wants to publish events without standing up an
+ /// actor system — the tracker registers a transient subscriber actor only
+ /// if the supplied stream is backed by an actor system. In the bare-stream
+ /// mode (no actor system) the tracker still exposes the
+ /// surface but cannot self-subscribe; production
+ /// callers always go through .
+ ///
+ ///
+ /// Subscribing to requires an ,
+ /// which can only be created from an . The bare-
+ /// stream ctor therefore can NOT itself wire the subscriber — tests that
+ /// want event-driven updates must use the ActorSystem ctor (or push state
+ /// directly via ). The tests in
+ /// SiteAuditTelemetryStalledTrackerTests use the ActorSystem ctor
+ /// via Akka.TestKit so they exercise the production subscribe path.
+ ///
+ public SiteAuditTelemetryStalledTracker(EventStream eventStream)
+ {
+ _eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream));
+ // No subscriber actor — see the remarks above. Apply() is exposed
+ // internally so the ActorSystem ctor's internal forwarder can update
+ // state without re-implementing the dictionary write.
+ _subscriber = null;
+ }
+
+ ///
+ /// Production ctor: subscribes a small internal actor to the supplied
+ /// system's EventStream so every published
+ /// updates the latched
+ /// per-site map. tears the subscriber down.
+ ///
+ public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem)
+ {
+ ArgumentNullException.ThrowIfNull(actorSystem);
+ _eventStream = actorSystem.EventStream;
+ // Anonymous subscriber actor scoped to the system; props build it
+ // with a callback into THIS tracker's Apply method so the actor's
+ // single-threaded receive serialises every dictionary write.
+ _subscriber = actorSystem.ActorOf(
+ Props.Create(() => new StalledChangedSubscriber(this)),
+ name: $"site-audit-stalled-tracker-{Guid.NewGuid():N}");
+ // Subscribe synchronously from the ctor so the subscription is in
+ // place before the tracker is returned to the caller — the actor's
+ // own PreStart runs asynchronously and would otherwise race the
+ // first publish. EventStream.Subscribe is thread-safe.
+ _eventStream.Subscribe(_subscriber, typeof(SiteAuditTelemetryStalledChanged));
+ }
+
+ ///
+ /// Returns a defensive copy of the per-site latched stalled state.
+ /// Absent sites are interpreted as Stalled=false by consumers.
+ ///
+ public IReadOnlyDictionary Snapshot() =>
+ new Dictionary(_state);
+
+ ///
+ /// Applied by the internal subscriber actor on every
+ /// publication. Exposed
+ /// internally so tests against the bare-stream ctor can still drive the
+ /// tracker, but the production path always goes through the actor.
+ ///
+ internal void Apply(SiteAuditTelemetryStalledChanged evt)
+ {
+ if (evt is null) return;
+ _state[evt.SiteId] = evt.Stalled;
+ }
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ if (_subscriber is not null)
+ {
+ // Unsubscribe runs in PostStop on the subscriber actor; Stop is
+ // fire-and-forget but the actor's PostStop hook is guaranteed to
+ // run before its mailbox is collected.
+ _subscriber.Tell(PoisonPill.Instance);
+ }
+ }
+
+ ///
+ /// Internal subscriber actor — receives every
+ /// off the EventStream and
+ /// forwards it into the parent .
+ /// Unlike DeadLetterMonitorActor, the subscription is registered by
+ /// the tracker constructor BEFORE this actor begins processing messages so
+ /// publishes that arrive between actor creation and PreStart cannot be
+ /// missed. Unsubscribe still runs in .
+ ///
+ private sealed class StalledChangedSubscriber : ReceiveActor
+ {
+ private readonly SiteAuditTelemetryStalledTracker _parent;
+
+ public StalledChangedSubscriber(SiteAuditTelemetryStalledTracker parent)
+ {
+ _parent = parent;
+ Receive(evt => _parent.Apply(evt));
+ }
+
+ protected override void PostStop()
+ {
+ Context.System.EventStream.Unsubscribe(Self, typeof(SiteAuditTelemetryStalledChanged));
+ base.PostStop();
+ }
+ }
+}
diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs
index 7bab904..5cde08b 100644
--- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs
+++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs
@@ -259,6 +259,17 @@ public static class ServiceCollectionExtensions
.Bind(config.GetSection(PartitionMaintenanceSectionName));
services.AddHostedService();
+ // M6 Bundle E (T7): central-singleton tracker for per-site
+ // SiteAuditTelemetryStalledChanged events published by
+ // SiteAuditReconciliationActor. Singleton so the EventStream
+ // subscription registers exactly once on host startup; the tracker's
+ // ctor needs an ActorSystem which is composed by the AkkaHostedService
+ // — we go through a factory so the SP is the source of truth for the
+ // system reference rather than re-resolving it on the consumer side.
+ services.AddSingleton(sp =>
+ new SiteAuditTelemetryStalledTracker(
+ sp.GetRequiredService()));
+
return services;
}
}
diff --git a/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs
new file mode 100644
index 0000000..7c375a1
--- /dev/null
+++ b/tests/ScadaLink.AuditLog.Tests/Central/SiteAuditTelemetryStalledTrackerTests.cs
@@ -0,0 +1,116 @@
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using ScadaLink.AuditLog.Central;
+
+namespace ScadaLink.AuditLog.Tests.Central;
+
+///
+/// Bundle E (M6-T7) tests for .
+/// The tracker subscribes to the actor system's EventStream for
+/// publications and maintains a
+/// per-site latch the central health surface can read. Since reconciliation is
+/// central-driven, the "stalled" state semantically belongs to central — not
+/// to the per-site
+/// payload (which the site itself emits). The tracker therefore lives as a
+/// central singleton, not on the site health collector.
+///
+public class SiteAuditTelemetryStalledTrackerTests : TestKit
+{
+ ///
+ /// Helper: publishes a stalled-changed event on the actor system's
+ /// EventStream and waits a moment for the tracker's subscribe callback to
+ /// run. AwaitAssert avoids racing on the stream's async fan-out.
+ ///
+ private void PublishAndWait(SiteAuditTelemetryStalledTracker tracker, SiteAuditTelemetryStalledChanged evt)
+ {
+ Sys.EventStream.Publish(evt);
+ AwaitAssert(
+ () =>
+ {
+ var snapshot = tracker.Snapshot();
+ Assert.True(snapshot.TryGetValue(evt.SiteId, out var stalled),
+ $"tracker did not record event for {evt.SiteId}");
+ Assert.Equal(evt.Stalled, stalled);
+ },
+ duration: TimeSpan.FromSeconds(2),
+ interval: TimeSpan.FromMilliseconds(20));
+ }
+
+ [Fact]
+ public void Initial_Snapshot_IsEmpty()
+ {
+ using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
+
+ var snapshot = tracker.Snapshot();
+
+ Assert.Empty(snapshot);
+ }
+
+ [Fact]
+ public void StalledTrue_Event_TrackerReports_Stalled()
+ {
+ using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
+
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
+
+ var snapshot = tracker.Snapshot();
+ Assert.True(snapshot["siteA"]);
+ }
+
+ [Fact]
+ public void StalledFalse_Event_TrackerReports_NotStalled()
+ {
+ using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
+
+ // First flip the site into stalled so the false transition has a
+ // prior value to overwrite — mirrors how the reconciliation actor
+ // only publishes false after a true.
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: false));
+
+ var snapshot = tracker.Snapshot();
+ Assert.False(snapshot["siteA"]);
+ }
+
+ [Fact]
+ public void Multiple_Sites_Tracked_Independently()
+ {
+ using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
+
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteB", Stalled: false));
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteC", Stalled: true));
+
+ var snapshot = tracker.Snapshot();
+ Assert.Equal(3, snapshot.Count);
+ Assert.True(snapshot["siteA"]);
+ Assert.False(snapshot["siteB"]);
+ Assert.True(snapshot["siteC"]);
+ }
+
+ [Fact]
+ public void Constructor_With_Null_ActorSystem_Throws()
+ {
+ Assert.Throws(
+ () => new SiteAuditTelemetryStalledTracker((ActorSystem)null!));
+ }
+
+ [Fact]
+ public void Dispose_Unsubscribes_From_EventStream()
+ {
+ var tracker = new SiteAuditTelemetryStalledTracker(Sys);
+
+ PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
+
+ tracker.Dispose();
+
+ // After dispose any further events are ignored — the snapshot
+ // reflects the last known state at dispose time.
+ Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: false));
+
+ // Give the stream a moment in case the unsubscribe is racey; the
+ // assertion is that siteA stays at true.
+ Thread.Sleep(50);
+ Assert.True(tracker.Snapshot()["siteA"]);
+ }
+}