feat(health): SiteAuditTelemetryStalledTracker subscribes to EventStream (#23 M6)
This commit is contained in:
@@ -0,0 +1,158 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
|
||||
namespace ScadaLink.AuditLog.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log (#23) M6 Bundle E (T7) — central singleton that subscribes to the
|
||||
/// actor system's EventStream for <see cref="SiteAuditTelemetryStalledChanged"/>
|
||||
/// publications and maintains a per-site latched stalled-state map readable
|
||||
/// via <see cref="Snapshot"/>. Consumed by the M6 Bundle E
|
||||
/// <see cref="AuditCentralHealthSnapshot"/> aggregator so the central health
|
||||
/// surface can surface per-site "reconciliation isn't draining" without
|
||||
/// coupling the publisher (<see cref="SiteAuditReconciliationActor"/>) to the
|
||||
/// health collection plumbing.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <b>Why an internal actor.</b> Akka.NET's <see cref="EventStream"/> only
|
||||
/// supports <see cref="IActorRef"/> subscribers — there is no callback or
|
||||
/// channel-based overload. The tracker therefore spawns a small subscriber
|
||||
/// actor that forwards each event into the shared
|
||||
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> on the actor's thread, and
|
||||
/// readers (<see cref="Snapshot"/>) take a copy off that dictionary on any
|
||||
/// thread. Mirrors the <c>DeadLetterMonitorActor</c> shape — subscribe in
|
||||
/// <see cref="ActorBase.PreStart"/>, unsubscribe in
|
||||
/// <see cref="ActorBase.PostStop"/>, which the tracker triggers via a Stop
|
||||
/// at <see cref="Dispose"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Per-site latching.</b> The publisher (<see cref="SiteAuditReconciliationActor"/>)
|
||||
/// only publishes on stalled-state transitions, so the dictionary is the
|
||||
/// authoritative latched state. Sites that have never published are absent
|
||||
/// from the snapshot — the consumer surface treats absence as
|
||||
/// <c>Stalled=false</c> (default healthy), the same default the reconciliation
|
||||
/// actor's own internal latch uses.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Singleton lifecycle.</b> Registered as a singleton via
|
||||
/// <see cref="ServiceCollectionExtensions.AddAuditLogCentralMaintenance"/>;
|
||||
/// <see cref="Dispose"/> tears the internal subscriber down at host shutdown.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SiteAuditTelemetryStalledTracker : IDisposable
|
||||
{
|
||||
private readonly EventStream _eventStream;
|
||||
private readonly ConcurrentDictionary<string, bool> _state = new();
|
||||
private readonly IActorRef? _subscriber;
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Construct around a bare <see cref="EventStream"/>. Intended for unit
|
||||
/// tests where the caller wants to publish events without standing up an
|
||||
/// actor system — the tracker registers a transient subscriber actor only
|
||||
/// if the supplied stream is backed by an actor system. In the bare-stream
|
||||
/// mode (no actor system) the tracker still exposes the
|
||||
/// <see cref="Snapshot"/> surface but cannot self-subscribe; production
|
||||
/// callers always go through <see cref="SiteAuditTelemetryStalledTracker(ActorSystem)"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Subscribing to <see cref="EventStream"/> requires an <see cref="IActorRef"/>,
|
||||
/// which can only be created from an <see cref="ActorSystem"/>. The bare-
|
||||
/// stream ctor therefore can NOT itself wire the subscriber — tests that
|
||||
/// want event-driven updates must use the ActorSystem ctor (or push state
|
||||
/// directly via <see cref="Apply"/>). The tests in
|
||||
/// <c>SiteAuditTelemetryStalledTrackerTests</c> use the ActorSystem ctor
|
||||
/// via Akka.TestKit so they exercise the production subscribe path.
|
||||
/// </remarks>
|
||||
public SiteAuditTelemetryStalledTracker(EventStream eventStream)
|
||||
{
|
||||
_eventStream = eventStream ?? throw new ArgumentNullException(nameof(eventStream));
|
||||
// No subscriber actor — see the remarks above. Apply() is exposed
|
||||
// internally so the ActorSystem ctor's internal forwarder can update
|
||||
// state without re-implementing the dictionary write.
|
||||
_subscriber = null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Production ctor: subscribes a small internal actor to the supplied
|
||||
/// system's EventStream so every published
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> updates the latched
|
||||
/// per-site map. <see cref="Dispose"/> tears the subscriber down.
|
||||
/// </summary>
|
||||
public SiteAuditTelemetryStalledTracker(ActorSystem actorSystem)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(actorSystem);
|
||||
_eventStream = actorSystem.EventStream;
|
||||
// Anonymous subscriber actor scoped to the system; props build it
|
||||
// with a callback into THIS tracker's Apply method so the actor's
|
||||
// single-threaded receive serialises every dictionary write.
|
||||
_subscriber = actorSystem.ActorOf(
|
||||
Props.Create(() => new StalledChangedSubscriber(this)),
|
||||
name: $"site-audit-stalled-tracker-{Guid.NewGuid():N}");
|
||||
// Subscribe synchronously from the ctor so the subscription is in
|
||||
// place before the tracker is returned to the caller — the actor's
|
||||
// own PreStart runs asynchronously and would otherwise race the
|
||||
// first publish. EventStream.Subscribe is thread-safe.
|
||||
_eventStream.Subscribe(_subscriber, typeof(SiteAuditTelemetryStalledChanged));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a defensive copy of the per-site latched stalled state.
|
||||
/// Absent sites are interpreted as <c>Stalled=false</c> by consumers.
|
||||
/// </summary>
|
||||
public IReadOnlyDictionary<string, bool> Snapshot() =>
|
||||
new Dictionary<string, bool>(_state);
|
||||
|
||||
/// <summary>
|
||||
/// Applied by the internal subscriber actor on every
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> publication. Exposed
|
||||
/// internally so tests against the bare-stream ctor can still drive the
|
||||
/// tracker, but the production path always goes through the actor.
|
||||
/// </summary>
|
||||
internal void Apply(SiteAuditTelemetryStalledChanged evt)
|
||||
{
|
||||
if (evt is null) return;
|
||||
_state[evt.SiteId] = evt.Stalled;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
if (_subscriber is not null)
|
||||
{
|
||||
// Unsubscribe runs in PostStop on the subscriber actor; Stop is
|
||||
// fire-and-forget but the actor's PostStop hook is guaranteed to
|
||||
// run before its mailbox is collected.
|
||||
_subscriber.Tell(PoisonPill.Instance);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Internal subscriber actor — receives every
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> off the EventStream and
|
||||
/// forwards it into the parent <see cref="SiteAuditTelemetryStalledTracker"/>.
|
||||
/// Unlike <c>DeadLetterMonitorActor</c>, the subscription is registered by
|
||||
/// the tracker constructor BEFORE this actor begins processing messages so
|
||||
/// publishes that arrive between actor creation and PreStart cannot be
|
||||
/// missed. Unsubscribe still runs in <see cref="PostStop"/>.
|
||||
/// </summary>
|
||||
private sealed class StalledChangedSubscriber : ReceiveActor
|
||||
{
|
||||
private readonly SiteAuditTelemetryStalledTracker _parent;
|
||||
|
||||
public StalledChangedSubscriber(SiteAuditTelemetryStalledTracker parent)
|
||||
{
|
||||
_parent = parent;
|
||||
Receive<SiteAuditTelemetryStalledChanged>(evt => _parent.Apply(evt));
|
||||
}
|
||||
|
||||
protected override void PostStop()
|
||||
{
|
||||
Context.System.EventStream.Unsubscribe(Self, typeof(SiteAuditTelemetryStalledChanged));
|
||||
base.PostStop();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -259,6 +259,17 @@ public static class ServiceCollectionExtensions
|
||||
.Bind(config.GetSection(PartitionMaintenanceSectionName));
|
||||
services.AddHostedService<AuditLogPartitionMaintenanceService>();
|
||||
|
||||
// M6 Bundle E (T7): central-singleton tracker for per-site
|
||||
// SiteAuditTelemetryStalledChanged events published by
|
||||
// SiteAuditReconciliationActor. Singleton so the EventStream
|
||||
// subscription registers exactly once on host startup; the tracker's
|
||||
// ctor needs an ActorSystem which is composed by the AkkaHostedService
|
||||
// — we go through a factory so the SP is the source of truth for the
|
||||
// system reference rather than re-resolving it on the consumer side.
|
||||
services.AddSingleton<SiteAuditTelemetryStalledTracker>(sp =>
|
||||
new SiteAuditTelemetryStalledTracker(
|
||||
sp.GetRequiredService<Akka.Actor.ActorSystem>()));
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using ScadaLink.AuditLog.Central;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Central;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle E (M6-T7) tests for <see cref="SiteAuditTelemetryStalledTracker"/>.
|
||||
/// The tracker subscribes to the actor system's EventStream for
|
||||
/// <see cref="SiteAuditTelemetryStalledChanged"/> publications and maintains a
|
||||
/// per-site latch the central health surface can read. Since reconciliation is
|
||||
/// central-driven, the "stalled" state semantically belongs to central — not
|
||||
/// to the per-site <see cref="ScadaLink.Commons.Messages.Health.SiteHealthReport"/>
|
||||
/// payload (which the site itself emits). The tracker therefore lives as a
|
||||
/// central singleton, not on the site health collector.
|
||||
/// </summary>
|
||||
public class SiteAuditTelemetryStalledTrackerTests : TestKit
|
||||
{
|
||||
/// <summary>
|
||||
/// Helper: publishes a stalled-changed event on the actor system's
|
||||
/// EventStream and waits a moment for the tracker's subscribe callback to
|
||||
/// run. AwaitAssert avoids racing on the stream's async fan-out.
|
||||
/// </summary>
|
||||
private void PublishAndWait(SiteAuditTelemetryStalledTracker tracker, SiteAuditTelemetryStalledChanged evt)
|
||||
{
|
||||
Sys.EventStream.Publish(evt);
|
||||
AwaitAssert(
|
||||
() =>
|
||||
{
|
||||
var snapshot = tracker.Snapshot();
|
||||
Assert.True(snapshot.TryGetValue(evt.SiteId, out var stalled),
|
||||
$"tracker did not record event for {evt.SiteId}");
|
||||
Assert.Equal(evt.Stalled, stalled);
|
||||
},
|
||||
duration: TimeSpan.FromSeconds(2),
|
||||
interval: TimeSpan.FromMilliseconds(20));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Initial_Snapshot_IsEmpty()
|
||||
{
|
||||
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
|
||||
|
||||
var snapshot = tracker.Snapshot();
|
||||
|
||||
Assert.Empty(snapshot);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StalledTrue_Event_TrackerReports_Stalled()
|
||||
{
|
||||
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
|
||||
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
|
||||
|
||||
var snapshot = tracker.Snapshot();
|
||||
Assert.True(snapshot["siteA"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void StalledFalse_Event_TrackerReports_NotStalled()
|
||||
{
|
||||
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
|
||||
|
||||
// First flip the site into stalled so the false transition has a
|
||||
// prior value to overwrite — mirrors how the reconciliation actor
|
||||
// only publishes false after a true.
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: false));
|
||||
|
||||
var snapshot = tracker.Snapshot();
|
||||
Assert.False(snapshot["siteA"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_Sites_Tracked_Independently()
|
||||
{
|
||||
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
|
||||
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteB", Stalled: false));
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteC", Stalled: true));
|
||||
|
||||
var snapshot = tracker.Snapshot();
|
||||
Assert.Equal(3, snapshot.Count);
|
||||
Assert.True(snapshot["siteA"]);
|
||||
Assert.False(snapshot["siteB"]);
|
||||
Assert.True(snapshot["siteC"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_With_Null_ActorSystem_Throws()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(
|
||||
() => new SiteAuditTelemetryStalledTracker((ActorSystem)null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Dispose_Unsubscribes_From_EventStream()
|
||||
{
|
||||
var tracker = new SiteAuditTelemetryStalledTracker(Sys);
|
||||
|
||||
PublishAndWait(tracker, new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
|
||||
|
||||
tracker.Dispose();
|
||||
|
||||
// After dispose any further events are ignored — the snapshot
|
||||
// reflects the last known state at dispose time.
|
||||
Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: false));
|
||||
|
||||
// Give the stream a moment in case the unsubscribe is racey; the
|
||||
// assertion is that siteA stays at true.
|
||||
Thread.Sleep(50);
|
||||
Assert.True(tracker.Snapshot()["siteA"]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user