fix(debug-stream): stream-first lifecycle with replay/dedup (#26, M2.18)

Re-architect DebugStreamBridgeActor from snapshot-first to stream-first so no
attribute/alarm event occurring during the snapshot-build + network-transit
window is lost (#26).

Lifecycle change:
- PreStart now opens the gRPC subscription FIRST (alongside sending the
  SubscribeDebugViewRequest), so live events start flowing immediately.
- Phase model via a single _snapshotDelivered flag (mutated only on the actor
  thread). While buffering (snapshot not yet delivered), AttributeValueChanged/
  AlarmStateChanged are appended to an ordered _preSnapshotBuffer instead of
  being delivered. After snapshot+flush, the same handlers pass through directly.
- On DebugViewSnapshot: deliver snapshot, then flush the buffer in arrival order
  with per-entity dedup, then set _snapshotDelivered=true (pass-through).

Dedup rule (exactly-once):
- Identity: attributes by (InstanceUniqueName, AttributePath, AttributeName);
  alarms by (InstanceUniqueName, AlarmName, SourceReference) so native
  per-condition alarms are not conflated. Keys joined with a NUL delimiter
  (declared as an escaped char constant; no raw NUL in source) so distinct
  identities never collide on a space within a name.
- Boundary: a buffered event whose timestamp is <= the snapshot's timestamp for
  the same entity is already reflected -> DROP; strictly-newer (>) -> DELIVER;
  entity absent from the snapshot -> DELIVER (genuine gap-window event).

Preserved paths:
- M2.11 InstanceNotFound: with stream-first the gRPC stream is already open, so
  the not-found path now tears it down (CleanupGrpc) + clears the buffer, does
  NOT enter pass-through, delivers the not-found snapshot, and stops cleanly.
- Reconnect (ReconnectGrpcStream -> OpenGrpcStream) does not touch the phase
  flag: a mid-session reconnect resumes pass-through; a reconnect during the
  buffering phase stays buffering until the snapshot arrives.
- Communication-008 retry/stability/stop/terminate + ReceiveTimeout orphan net
  unchanged. Duplicate/late snapshot after delivery is ignored defensively.

Tests: 10 new M2.18 tests (stream-first ordering, gap-window buffering, dedup
drop/deliver for attrs + alarms, ordering, pass-through, InstanceNotFound
teardown, reconnect-during-buffering, reconnect-after-snapshot) + revised the
M2.11 not-found test to assert stream teardown. Full DebugStreamBridgeActor
class green: 23/23.
This commit is contained in:
Joseph Doherty
2026-06-16 07:33:26 -04:00
parent c1043569f6
commit d8519cb464
3 changed files with 595 additions and 24 deletions
@@ -10,10 +10,24 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// Long-lived (one per active debug session) actor on the central side. Debug sessions
/// are session-based and temporary — this actor holds no persisted state and does not
/// derive from an Akka.Persistence base class; its state does not survive a restart.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
/// <para>
/// <b>Stream-first lifecycle (M2.18, #26).</b> To avoid losing any
/// <see cref="AttributeValueChanged"/>/<see cref="AlarmStateChanged"/> that occurs on
/// the site during the snapshot-build + network-transit window, the gRPC server-streaming
/// subscription is opened FIRST (in <see cref="PreStart"/>), alongside the
/// <c>SubscribeDebugViewRequest</c> sent to the site via CentralCommunicationActor (with
/// THIS actor as the Sender). Live events that arrive before the
/// <see cref="DebugViewSnapshot"/> is delivered are <em>buffered in arrival order</em>.
/// When the snapshot arrives it is delivered to the consumer, then the buffer is flushed
/// in order, <em>deduped</em> against the snapshot (an event whose per-entity timestamp is
/// &lt;= the snapshot's timestamp for the same entity is already reflected → dropped; a
/// strictly-newer event is delivered; an event for an entity absent from the snapshot is
/// delivered). After the flush the actor switches to pass-through: subsequent events go
/// straight to the consumer. A mid-session reconnect (after the snapshot) resumes
/// pass-through — the snapshot is a one-time thing.
/// </para>
/// Stream events are marshalled back to the actor via Self.Tell for thread safety; all
/// state (phase flag + buffer) is mutated only on the actor thread.
/// </summary>
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
@@ -49,6 +63,31 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
private bool _stopped;
private CancellationTokenSource? _grpcCts;
/// <summary>
/// Phase flag (M2.18). <see langword="false"/> until the initial
/// <see cref="DebugViewSnapshot"/> has been delivered and the pre-snapshot buffer
/// flushed; <see langword="true"/> thereafter (pass-through). Mutated only on the
/// actor thread. A reconnect does NOT touch this flag — a mid-session reconnect
/// (after the snapshot) therefore stays in pass-through, and a reconnect during the
/// buffering phase (before the snapshot) stays buffering.
/// </summary>
private bool _snapshotDelivered;
/// <summary>
/// Ordered buffer of live gRPC events (<see cref="AttributeValueChanged"/>/
/// <see cref="AlarmStateChanged"/>) that arrived before the snapshot was delivered.
/// Flushed (with per-entity dedup against the snapshot) when the snapshot arrives,
/// then never used again. Mutated only on the actor thread.
/// </summary>
private readonly List<object> _preSnapshotBuffer = new();
/// <summary>
/// Defensive log threshold: if the pre-snapshot buffer grows past this many events
/// during a slow snapshot we log once (events are NOT dropped — the window is short).
/// </summary>
private const int BufferWarnThreshold = 10_000;
private bool _bufferWarned;
/// <summary>Timer scheduler for reconnect and stability window timers.</summary>
public ITimerScheduler Timers { get; set; } = null!;
@@ -87,16 +126,33 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
// Initial snapshot response from the site (via ClusterClient).
// M2.11: if the site reports InstanceNotFound=true the instance is not
// deployed there. Forward the snapshot (with InstanceNotFound=true) to
// _onEvent so DebugStreamService's TCS resolves and the caller can
// inspect the flag; then stop cleanly without opening a gRPC stream.
// deployed there. M2.18: under the stream-first lifecycle the gRPC stream
// was already opened in PreStart, so the not-found path must tear it down
// (CleanupGrpc) rather than enter pass-through. Forward the snapshot (with
// InstanceNotFound=true) to _onEvent so DebugStreamService's TCS resolves and
// the caller can inspect the flag; then stop cleanly.
Receive<DebugViewSnapshot>(snapshot =>
{
if (_snapshotDelivered)
{
// Defensive: a duplicate / late snapshot after we have already moved to
// pass-through. The snapshot is a one-time thing — ignore replays so we
// never re-buffer or double-deliver.
_log.Debug("Ignoring duplicate DebugViewSnapshot for {0} (already delivered)",
_instanceUniqueName);
return;
}
if (snapshot.InstanceNotFound)
{
_log.Warning("Instance {0} is not deployed on site; terminating debug stream",
_instanceUniqueName);
_stopped = true;
// M2.18: the stream-first subscription opened in PreStart is for a
// non-deployed instance — cancel it (and any buffered gap events are
// discarded with the actor). No pass-through.
CleanupGrpc();
_preSnapshotBuffer.Clear();
_onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true
// Note: after Context.Stop(Self) below the actor is dead. DebugStreamService
// inspects InitialSnapshot.InstanceNotFound and calls StopStream, which sends
@@ -106,10 +162,15 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
return;
}
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms); flushing {3} buffered event(s)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count,
_preSnapshotBuffer.Count);
// Deliver the snapshot, then flush the gap-window buffer (deduped), then
// switch to pass-through. Order matters: snapshot first, buffered events next.
_onEvent(snapshot);
OpenGrpcStream();
FlushBuffer(snapshot);
_snapshotDelivered = true;
});
// Domain events arriving via Self.Tell from gRPC callback.
@@ -117,8 +178,11 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
// flapping stream that delivers a single event between failures would
// otherwise never trip MaxRetries. The retry budget is recovered only by
// GrpcStreamStable (a stream that has stayed up for StabilityWindow).
Receive<AttributeValueChanged>(changed => _onEvent(changed));
Receive<AlarmStateChanged>(changed => _onEvent(changed));
// M2.18: before the snapshot has been delivered, BUFFER (in arrival order)
// rather than deliver — these may be gap-window events. After the snapshot has
// been flushed, pass through directly (same handler, phase-dependent behavior).
Receive<AttributeValueChanged>(changed => HandleStreamEvent(changed));
Receive<AlarmStateChanged>(changed => HandleStreamEvent(changed));
// Stream has been stably connected for StabilityWindow — recover the
// retry budget so a future transient fault gets a fresh set of retries.
@@ -173,11 +237,149 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
});
}
/// <summary>
/// Handles a live gRPC stream event (<see cref="AttributeValueChanged"/> or
/// <see cref="AlarmStateChanged"/>). Before the snapshot has been delivered the
/// event is appended to the ordered pre-snapshot buffer (gap-window capture); after
/// the snapshot+flush it is passed straight through to the consumer. Always runs on
/// the actor thread (events are marshalled in via Self.Tell), so the phase flag and
/// buffer are accessed without locking.
/// </summary>
private void HandleStreamEvent(object evt)
{
if (_snapshotDelivered)
{
_onEvent(evt);
return;
}
_preSnapshotBuffer.Add(evt);
if (!_bufferWarned && _preSnapshotBuffer.Count > BufferWarnThreshold)
{
_bufferWarned = true;
_log.Warning(
"Pre-snapshot debug-event buffer for {0} exceeded {1} events while awaiting the snapshot; " +
"events are still retained (not dropped).",
_instanceUniqueName, BufferWarnThreshold);
}
}
/// <summary>
/// Flushes the pre-snapshot buffer in arrival order, deduping each event against the
/// just-delivered snapshot (M2.18).
/// <para>
/// <b>Dedup rule.</b> Identity is per-entity:
/// attributes by (InstanceUniqueName, AttributePath, AttributeName); alarms by
/// (InstanceUniqueName, AlarmName, SourceReference). For a buffered event whose entity
/// is present in the snapshot, the comparison is against that entity's snapshot
/// timestamp: a buffered timestamp &lt;= the snapshot timestamp means the event is
/// already reflected in the snapshot → DROP; a strictly-newer (&gt;) timestamp means
/// the event happened after the snapshot was built → DELIVER. The boundary is inclusive
/// on the snapshot side (equal timestamps are treated as duplicates) — the snapshot is
/// the authoritative point-in-time value, so an event at the exact same instant carries
/// no new information. A buffered event whose entity is NOT in the snapshot is a genuine
/// gap-window event → DELIVER.
/// </para>
/// </summary>
private void FlushBuffer(DebugViewSnapshot snapshot)
{
if (_preSnapshotBuffer.Count == 0) return;
// Build per-entity "as-of" timestamps from the snapshot. If (defensively) the
// snapshot lists the same entity twice, keep the newest timestamp.
var attrAsOf = new Dictionary<string, DateTimeOffset>();
foreach (var a in snapshot.AttributeValues)
{
var key = AttributeKey(a);
if (!attrAsOf.TryGetValue(key, out var existing) || a.Timestamp > existing)
attrAsOf[key] = a.Timestamp;
}
var alarmAsOf = new Dictionary<string, DateTimeOffset>();
foreach (var al in snapshot.AlarmStates)
{
var key = AlarmKey(al);
if (!alarmAsOf.TryGetValue(key, out var existing) || al.Timestamp > existing)
alarmAsOf[key] = al.Timestamp;
}
var flushed = 0;
var dropped = 0;
foreach (var evt in _preSnapshotBuffer)
{
if (IsReflectedInSnapshot(evt, attrAsOf, alarmAsOf))
{
dropped++;
continue;
}
_onEvent(evt);
flushed++;
}
if (dropped > 0 || flushed > 0)
{
_log.Debug("Flushed {0} buffered debug event(s) for {1}, dropped {2} as already-in-snapshot",
flushed, _instanceUniqueName, dropped);
}
_preSnapshotBuffer.Clear();
}
/// <summary>
/// Returns <see langword="true"/> when a buffered event is already reflected in the
/// snapshot (same entity, buffered timestamp &lt;= snapshot timestamp) and must be
/// dropped; otherwise <see langword="false"/> (deliver).
/// </summary>
private static bool IsReflectedInSnapshot(
object evt,
IReadOnlyDictionary<string, DateTimeOffset> attrAsOf,
IReadOnlyDictionary<string, DateTimeOffset> alarmAsOf)
{
switch (evt)
{
case AttributeValueChanged a:
return attrAsOf.TryGetValue(AttributeKey(a), out var attrTs) && a.Timestamp <= attrTs;
case AlarmStateChanged al:
return alarmAsOf.TryGetValue(AlarmKey(al), out var alarmTs) && al.Timestamp <= alarmTs;
default:
// Unknown buffered type (should not happen — only attr/alarm are buffered):
// never treat as a duplicate.
return false;
}
}
/// <summary>
/// Delimiter used to join identity components into a single dedup key. A NUL
/// control character cannot appear in an instance/attribute/alarm name, so
/// distinct identities never collide on a shared boundary (unlike a space, which
/// may legitimately occur within a name). Declared as an escaped char so the
/// source carries no raw NUL byte.
/// </summary>
private const char KeyDelimiter = '\u0000';
/// <summary>Per-entity dedup key for an attribute change.</summary>
private static string AttributeKey(AttributeValueChanged a) =>
string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName);
/// <summary>
/// Per-entity dedup key for an alarm change. Includes <see cref="AlarmStateChanged.SourceReference"/>
/// so native per-condition alarms (which share an AlarmName but differ by source
/// reference) are not conflated; empty for computed alarms.
/// </summary>
private static string AlarmKey(AlarmStateChanged al) =>
string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference);
/// <inheritdoc />
protected override void PreStart()
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
// M2.18 stream-first: open the gRPC live-event subscription BEFORE (and
// alongside) requesting the snapshot, so events occurring during the
// snapshot-build + network-transit window are captured (buffered) and not lost.
OpenGrpcStream();
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);