using System.Collections.Concurrent;
using Akka.Actor;
using Akka.Event;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
///
/// Audit Log (#23) M6 Bundle E (T7) — central singleton that subscribes to the
/// actor system's EventStream for
/// publications and maintains a per-site latched stalled-state map readable
/// via . Consumed by the M6 Bundle E
/// aggregator so the central health
/// surface can surface per-site "reconciliation isn't draining" without
/// coupling the publisher () to the
/// health collection plumbing.
///
///
///
/// Why an internal actor. Akka.NET's only
/// supports subscribers — there is no callback or
/// channel-based overload. The tracker therefore spawns a small subscriber
/// actor that forwards each event into the shared
/// on the actor's thread, and
/// readers () take a copy off that dictionary on any
/// thread. Mirrors the DeadLetterMonitorActor shape — subscribe in
/// , unsubscribe in
/// , which the tracker triggers via a Stop
/// at .
///
///
/// Per-site latching. The publisher ()
/// only publishes on stalled-state transitions, so the dictionary is the
/// authoritative latched state. Sites that have never published are absent
/// from the snapshot — the consumer surface treats absence as
/// Stalled=false (default healthy), the same default the reconciliation
/// actor's own internal latch uses.
///
///
/// Singleton lifecycle. Registered as a singleton via
/// ;
/// tears the internal subscriber down at host shutdown.
///
///
public sealed class SiteAuditTelemetryStalledTracker : IDisposable
{
private readonly EventStream _eventStream;
private readonly ConcurrentDictionary _state = new();
private readonly IActorRef? _subscriber;
private readonly AuditCentralHealthSnapshot? _snapshot;
private bool _disposed;
///
/// Construct around a bare . Intended for unit
/// tests where the caller wants to publish events without standing up an
/// actor system — the tracker registers a transient subscriber actor only
/// if the supplied stream is backed by an actor system. In the bare-stream
/// mode (no actor system) the tracker still exposes the
/// surface but cannot self-subscribe; production
/// callers always go through .
///
///
/// Subscribing to requires an ,
/// which can only be created from an . The bare-
/// stream ctor therefore can NOT itself wire the subscriber — tests that
/// want event-driven updates must use the ActorSystem ctor (or push state
/// directly via ). The tests in
/// SiteAuditTelemetryStalledTrackerTests use the ActorSystem ctor
/// via Akka.TestKit so they exercise the production subscribe path.
///
/// The actor system event stream to observe.
public SiteAuditTelemetryStalledTracker(EventStream eventStream)
: this(eventStream, snapshot: null)
{
}
///
/// Bare-stream ctor with an optional snapshot sink — the central
/// composition root passes the singleton
/// so every dictionary update
/// also lands on the central health surface. The bare ctor still cannot
/// subscribe (no actor system), but tests that drive the tracker via
/// get the snapshot push for free.
///
/// The actor system event stream to observe.
/// Optional central health snapshot to mirror stalled-state changes into.
public SiteAuditTelemetryStalledTracker(EventStream eventStream, AuditCentralHealthSnapshot? snapshot)
{
_eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream));
// No subscriber actor — see the remarks on the parameterless overload.
_subscriber = null;
_snapshot = snapshot;
}
///
/// Production ctor: subscribes a small internal actor to the supplied
/// system's EventStream so every published
/// updates the latched
/// per-site map. tears the subscriber down.
///
/// The actor system whose EventStream will be subscribed.
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem)
: this(actorSystem, snapshot: null)
{
}
///
/// Production ctor with a snapshot sink — every observed
/// is mirrored onto the
/// shared so the central health
/// surface sees per-site stalled state without re-reading the tracker.
///
/// The actor system whose EventStream will be subscribed.
/// Optional central health snapshot to mirror stalled-state changes into.
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem, AuditCentralHealthSnapshot? snapshot)
{
ArgumentNullException.ThrowIfNull(actorSystem);
_eventStream = actorSystem.EventStream;
_snapshot = snapshot;
// Anonymous subscriber actor scoped to the system; props build it
// with a callback into THIS tracker's Apply method so the actor's
// single-threaded receive serialises every dictionary write.
_subscriber = actorSystem.ActorOf(
Props.Create(() => new StalledChangedSubscriber(this)),
name: $"site-audit-stalled-tracker-{Guid.NewGuid():N}");
// Subscribe synchronously from the ctor so the subscription is in
// place before the tracker is returned to the caller — the actor's
// own PreStart runs asynchronously and would otherwise race the
// first publish. EventStream.Subscribe is thread-safe.
_eventStream.Subscribe(_subscriber, typeof(SiteAuditTelemetryStalledChanged));
}
///
/// Returns a defensive copy of the per-site latched stalled state.
/// Absent sites are interpreted as Stalled=false by consumers.
///
/// A snapshot dictionary mapping each known site ID to its current stalled state.
public IReadOnlyDictionary Snapshot() =>
new Dictionary(_state);
///
/// Applied by the internal subscriber actor on every
/// publication. Exposed
/// internally so tests against the bare-stream ctor can still drive the
/// tracker, but the production path always goes through the actor.
///
/// The stalled-state change event to apply.
internal void Apply(SiteAuditTelemetryStalledChanged evt)
{
if (evt is null) return;
_state[evt.SiteId] = evt.Stalled;
// Mirror into the central health snapshot if wired so a reader of
// IAuditCentralHealthSnapshot sees the same per-site state without
// a second lookup. Snapshot is optional (test composition roots may
// skip it) so the null-coalesce is the safe path.
_snapshot?.ApplyStalled(evt);
}
///
/// Disposes the tracker and tears down the internal subscriber actor.
///
public void Dispose()
{
if (_disposed) return;
_disposed = true;
if (_subscriber is not null)
{
// Unsubscribe runs in PostStop on the subscriber actor; Stop is
// fire-and-forget but the actor's PostStop hook is guaranteed to
// run before its mailbox is collected.
_subscriber.Tell(PoisonPill.Instance);
}
}
///
/// Internal subscriber actor — receives every
/// off the EventStream and
/// forwards it into the parent .
/// Unlike DeadLetterMonitorActor, the subscription is registered by
/// the tracker constructor BEFORE this actor begins processing messages so
/// publishes that arrive between actor creation and PreStart cannot be
/// missed. Unsubscribe still runs in .
///
private sealed class StalledChangedSubscriber : ReceiveActor
{
private readonly SiteAuditTelemetryStalledTracker _parent;
///
/// Initializes a new subscriber actor that forwards events to the given tracker.
///
/// The parent tracker whose method will be called for each event.
public StalledChangedSubscriber(SiteAuditTelemetryStalledTracker parent)
{
_parent = parent;
Receive(evt => _parent.Apply(evt));
}
///
protected override void PostStop()
{
Context.System.EventStream.Unsubscribe(Self, typeof(SiteAuditTelemetryStalledChanged));
base.PostStop();
}
}
}