ef49b55cf6
The snapshot's per-site stalled latch now lives on the snapshot itself and is fed by SiteAuditTelemetryStalledTracker via ApplyStalled, removing the chain that required ActorSystem at DI composition time. The tracker is now constructed by AkkaHostedService once ActorSystem.Create returns, with a lock-guarded auxiliary-disposable list so concurrent host start/stop in tests cannot race the enumeration.
189 lines
8.7 KiB
C#
189 lines
8.7 KiB
C#
using System.Collections.Concurrent;
|
|
using Akka.Actor;
|
|
using Akka.Event;
|
|
|
|
namespace ScadaLink.AuditLog.Central;
|
|
|
|
/// <summary>
|
|
/// Audit Log (#23) M6 Bundle E (T7) — central singleton that subscribes to the
|
|
/// actor system's EventStream for <see cref="SiteAuditTelemetryStalledChanged"/>
|
|
/// publications and maintains a per-site latched stalled-state map readable
|
|
/// via <see cref="Snapshot"/>. Consumed by the M6 Bundle E
|
|
/// <see cref="AuditCentralHealthSnapshot"/> aggregator so the central health
|
|
/// surface can surface per-site "reconciliation isn't draining" without
|
|
/// coupling the publisher (<see cref="SiteAuditReconciliationActor"/>) to the
|
|
/// health collection plumbing.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <b>Why an internal actor.</b> Akka.NET's <see cref="EventStream"/> only
|
|
/// supports <see cref="IActorRef"/> subscribers — there is no callback or
|
|
/// channel-based overload. The tracker therefore spawns a small subscriber
|
|
/// actor that forwards each event into the shared
|
|
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> on the actor's thread, and
|
|
/// readers (<see cref="Snapshot"/>) take a copy off that dictionary on any
|
|
/// thread. Mirrors the <c>DeadLetterMonitorActor</c> shape — subscribe in
|
|
/// <see cref="ActorBase.PreStart"/>, unsubscribe in
|
|
/// <see cref="ActorBase.PostStop"/>, which the tracker triggers via a Stop
|
|
/// at <see cref="Dispose"/>.
|
|
/// </para>
|
|
/// <para>
|
|
/// <b>Per-site latching.</b> The publisher (<see cref="SiteAuditReconciliationActor"/>)
|
|
/// only publishes on stalled-state transitions, so the dictionary is the
|
|
/// authoritative latched state. Sites that have never published are absent
|
|
/// from the snapshot — the consumer surface treats absence as
|
|
/// <c>Stalled=false</c> (default healthy), the same default the reconciliation
|
|
/// actor's own internal latch uses.
|
|
/// </para>
|
|
/// <para>
|
|
/// <b>Singleton lifecycle.</b> Registered as a singleton via
|
|
/// <see cref="ServiceCollectionExtensions.AddAuditLogCentralMaintenance"/>;
|
|
/// <see cref="Dispose"/> tears the internal subscriber down at host shutdown.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class SiteAuditTelemetryStalledTracker : IDisposable
|
|
{
|
|
private readonly EventStream _eventStream;
|
|
private readonly ConcurrentDictionary<string, bool> _state = new();
|
|
private readonly IActorRef? _subscriber;
|
|
private readonly AuditCentralHealthSnapshot? _snapshot;
|
|
private bool _disposed;
|
|
|
|
/// <summary>
|
|
/// Construct around a bare <see cref="EventStream"/>. Intended for unit
|
|
/// tests where the caller wants to publish events without standing up an
|
|
/// actor system — the tracker registers a transient subscriber actor only
|
|
/// if the supplied stream is backed by an actor system. In the bare-stream
|
|
/// mode (no actor system) the tracker still exposes the
|
|
/// <see cref="Snapshot"/> surface but cannot self-subscribe; production
|
|
/// callers always go through <see cref="SiteAuditTelemetryStalledTracker(ActorSystem)"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Subscribing to <see cref="EventStream"/> requires an <see cref="IActorRef"/>,
|
|
/// which can only be created from an <see cref="ActorSystem"/>. The bare-
|
|
/// stream ctor therefore can NOT itself wire the subscriber — tests that
|
|
/// want event-driven updates must use the ActorSystem ctor (or push state
|
|
/// directly via <see cref="Apply"/>). The tests in
|
|
/// <c>SiteAuditTelemetryStalledTrackerTests</c> use the ActorSystem ctor
|
|
/// via Akka.TestKit so they exercise the production subscribe path.
|
|
/// </remarks>
|
|
public SiteAuditTelemetryStalledTracker(EventStream eventStream)
|
|
: this(eventStream, snapshot: null)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Bare-stream ctor with an optional snapshot sink — the central
|
|
/// composition root passes the singleton
|
|
/// <see cref="AuditCentralHealthSnapshot"/> so every dictionary update
|
|
/// also lands on the central health surface. The bare ctor still cannot
|
|
/// subscribe (no actor system), but tests that drive the tracker via
|
|
/// <see cref="Apply"/> get the snapshot push for free.
|
|
/// </summary>
|
|
public SiteAuditTelemetryStalledTracker(EventStream eventStream, AuditCentralHealthSnapshot? snapshot)
|
|
{
|
|
_eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream));
|
|
// No subscriber actor — see the remarks on the parameterless overload.
|
|
_subscriber = null;
|
|
_snapshot = snapshot;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Production ctor: subscribes a small internal actor to the supplied
|
|
/// system's EventStream so every published
|
|
/// <see cref="SiteAuditTelemetryStalledChanged"/> updates the latched
|
|
/// per-site map. <see cref="Dispose"/> tears the subscriber down.
|
|
/// </summary>
|
|
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem)
|
|
: this(actorSystem, snapshot: null)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Production ctor with a snapshot sink — every observed
|
|
/// <see cref="SiteAuditTelemetryStalledChanged"/> is mirrored onto the
|
|
/// shared <see cref="AuditCentralHealthSnapshot"/> so the central health
|
|
/// surface sees per-site stalled state without re-reading the tracker.
|
|
/// </summary>
|
|
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem, AuditCentralHealthSnapshot? snapshot)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(actorSystem);
|
|
_eventStream = actorSystem.EventStream;
|
|
_snapshot = snapshot;
|
|
// Anonymous subscriber actor scoped to the system; props build it
|
|
// with a callback into THIS tracker's Apply method so the actor's
|
|
// single-threaded receive serialises every dictionary write.
|
|
_subscriber = actorSystem.ActorOf(
|
|
Props.Create(() => new StalledChangedSubscriber(this)),
|
|
name: $"site-audit-stalled-tracker-{Guid.NewGuid():N}");
|
|
// Subscribe synchronously from the ctor so the subscription is in
|
|
// place before the tracker is returned to the caller — the actor's
|
|
// own PreStart runs asynchronously and would otherwise race the
|
|
// first publish. EventStream.Subscribe is thread-safe.
|
|
_eventStream.Subscribe(_subscriber, typeof(SiteAuditTelemetryStalledChanged));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns a defensive copy of the per-site latched stalled state.
|
|
/// Absent sites are interpreted as <c>Stalled=false</c> by consumers.
|
|
/// </summary>
|
|
public IReadOnlyDictionary<string, bool> Snapshot() =>
|
|
new Dictionary<string, bool>(_state);
|
|
|
|
/// <summary>
|
|
/// Applied by the internal subscriber actor on every
|
|
/// <see cref="SiteAuditTelemetryStalledChanged"/> publication. Exposed
|
|
/// internally so tests against the bare-stream ctor can still drive the
|
|
/// tracker, but the production path always goes through the actor.
|
|
/// </summary>
|
|
internal void Apply(SiteAuditTelemetryStalledChanged evt)
|
|
{
|
|
if (evt is null) return;
|
|
_state[evt.SiteId] = evt.Stalled;
|
|
// Mirror into the central health snapshot if wired so a reader of
|
|
// IAuditCentralHealthSnapshot sees the same per-site state without
|
|
// a second lookup. Snapshot is optional (test composition roots may
|
|
// skip it) so the null-coalesce is the safe path.
|
|
_snapshot?.ApplyStalled(evt);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
if (_subscriber is not null)
|
|
{
|
|
// Unsubscribe runs in PostStop on the subscriber actor; Stop is
|
|
// fire-and-forget but the actor's PostStop hook is guaranteed to
|
|
// run before its mailbox is collected.
|
|
_subscriber.Tell(PoisonPill.Instance);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Internal subscriber actor — receives every
|
|
/// <see cref="SiteAuditTelemetryStalledChanged"/> off the EventStream and
|
|
/// forwards it into the parent <see cref="SiteAuditTelemetryStalledTracker"/>.
|
|
/// Unlike <c>DeadLetterMonitorActor</c>, the subscription is registered by
|
|
/// the tracker constructor BEFORE this actor begins processing messages so
|
|
/// publishes that arrive between actor creation and PreStart cannot be
|
|
/// missed. Unsubscribe still runs in <see cref="PostStop"/>.
|
|
/// </summary>
|
|
private sealed class StalledChangedSubscriber : ReceiveActor
|
|
{
|
|
private readonly SiteAuditTelemetryStalledTracker _parent;
|
|
|
|
public StalledChangedSubscriber(SiteAuditTelemetryStalledTracker parent)
|
|
{
|
|
_parent = parent;
|
|
Receive<SiteAuditTelemetryStalledChanged>(evt => _parent.Apply(evt));
|
|
}
|
|
|
|
protected override void PostStop()
|
|
{
|
|
Context.System.EventStream.Unsubscribe(Self, typeof(SiteAuditTelemetryStalledChanged));
|
|
base.PostStop();
|
|
}
|
|
}
|
|
}
|