refactor(historian): gate before translate (no discarded alloc on secondary) + strengthen double-write warning (review)
This commit is contained in:
@@ -60,15 +60,18 @@ public sealed class HistorianAdapterActor : ReceiveActor
|
||||
_sink = sink;
|
||||
_localNode = localNode;
|
||||
|
||||
// A direct AlarmHistorianEvent source (kept for a future engine→historian path) goes through the
|
||||
// same Primary gate as the alerts-topic feed below.
|
||||
Receive<AlarmHistorianEvent>(Historize);
|
||||
// A direct AlarmHistorianEvent source is kept for a future engine→historian path; it goes through the
|
||||
// SAME ShouldHistorize gate. WARNING: do NOT Tell an AlarmHistorianEvent for a transition that is ALSO
|
||||
// published on the `alerts` topic — the sink would then double-write (both handlers enqueue it).
|
||||
Receive<AlarmHistorianEvent>(evt => { if (ShouldHistorize()) _ = EnqueueAsync(evt); });
|
||||
|
||||
// Live alarm transitions arrive off the cluster `alerts` DPS topic (subscribed in PreStart). The
|
||||
// Primary ScriptedAlarmHostActor publishes each transition ONCE, but DistributedPubSub fans that
|
||||
// single message to EVERY node's subscriber — including BOTH central nodes' historian adapters. The
|
||||
// Primary gate in Historize keeps only the Primary writing ⇒ exactly-once across the warm pair.
|
||||
Receive<AlarmTransitionEvent>(t => Historize(Translate(t)));
|
||||
// ShouldHistorize gate keeps only the Primary writing ⇒ exactly-once across the warm pair.
|
||||
// NOTE: Translate is intentionally inside the gate so Secondary/Detached nodes never allocate a
|
||||
// discarded AlarmHistorianEvent.
|
||||
Receive<AlarmTransitionEvent>(t => { if (ShouldHistorize()) _ = EnqueueAsync(Translate(t)); });
|
||||
|
||||
Receive<GetStatus>(_ => Sender.Tell(_sink.GetStatus()));
|
||||
|
||||
@@ -79,22 +82,11 @@ public sealed class HistorianAdapterActor : ReceiveActor
|
||||
Receive<SubscribeAck>(_ => { });
|
||||
}
|
||||
|
||||
/// <summary>Gates a historian event to the Primary then enqueues it fire-and-forget. Warm-standby
|
||||
/// dedup: only the Primary historizes to the durable sink so the per-node alerts feed writes exactly
|
||||
/// once. Default-write until told Secondary/Detached so single-node deploys + the boot window never
|
||||
/// drop historization. Fire-and-forget because <see cref="SqliteStoreAndForwardSink"/> persists to
|
||||
/// local SQLite synchronously inside <see cref="EnqueueAsync"/> (it returns once the row is committed),
|
||||
/// so we don't block the mailbox on network/pipe latency; failures surface via <see cref="GetStatus"/>.</summary>
|
||||
/// <param name="evt">The historian event to gate + enqueue.</param>
|
||||
private void Historize(AlarmHistorianEvent evt)
|
||||
{
|
||||
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_ = EnqueueAsync(evt);
|
||||
}
|
||||
/// <summary>True iff this node should write to the durable sink: only the Primary historizes
|
||||
/// (default-write while the role is unknown so single-node deploys + the boot window never drop
|
||||
/// historization). DPS fans the Primary's single alerts publish to BOTH nodes' historians, so this
|
||||
/// gate is what keeps the durable write exactly-once across the warm-redundant pair.</summary>
|
||||
private bool ShouldHistorize() => _localRole is not (RedundancyRole.Secondary or RedundancyRole.Detached);
|
||||
|
||||
/// <summary>Translates a live <see cref="AlarmTransitionEvent"/> (the alerts-topic shape) into the
|
||||
/// historian's <see cref="AlarmHistorianEvent"/>. <c>AlarmTypeName</c> is null-coalesced to
|
||||
|
||||
+1
-3
@@ -29,12 +29,11 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
|
||||
/// <summary>Thread-safe fake sink that records every <see cref="EnqueueAsync"/> call.</summary>
|
||||
private sealed class RecordingSink : IAlarmHistorianSink
|
||||
{
|
||||
private int _count;
|
||||
private readonly object _lock = new();
|
||||
private readonly List<AlarmHistorianEvent> _events = new();
|
||||
|
||||
/// <summary>The number of <see cref="EnqueueAsync"/> calls observed so far.</summary>
|
||||
public int EnqueueCount => Volatile.Read(ref _count);
|
||||
public int EnqueueCount { get { lock (_lock) { return _events.Count; } } }
|
||||
|
||||
/// <summary>A snapshot of every event enqueued so far (in arrival order).</summary>
|
||||
public IReadOnlyList<AlarmHistorianEvent> Events
|
||||
@@ -50,7 +49,6 @@ public sealed class HistorianAdapterActorTests : RuntimeActorTestBase
|
||||
_events.Add(evt);
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _count);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user