diff --git a/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json b/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json
index f20e8fe8..2ab026f3 100644
--- a/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json
+++ b/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json
@@ -15,11 +15,11 @@
{"id": 43, "ref": "M2.11", "subject": "M2.11 #24: debug snapshot unknown-instance returns error", "class": "small", "status": "completed", "commits": ["dbf44b9", "d160c7f"]},
{"id": 44, "ref": "M2.12", "subject": "M2.12 #25: recursion-limit error to site event log", "class": "small", "status": "completed", "commits": ["f08038d", "e2b31a9"]},
{"id": 45, "ref": "M2.13", "subject": "M2.13 #27: populate obtainable OPC UA/MxGateway transition fields", "class": "small", "status": "completed", "commits": ["722b866", "3945789"]},
- {"id": 46, "ref": "M2.14", "subject": "M2.14 #28: readiness gate checks required cluster singletons", "class": "standard", "status": "completed", "commits": ["253bec5"]},
- {"id": 47, "ref": "M2.15", "subject": "M2.15 #29: register site active-node purge gate (DI)", "class": "small", "status": "pending"},
- {"id": 48, "ref": "M2.16", "subject": "M2.16 #30: Health Monitoring consumes FailedWriteCount", "class": "small", "status": "pending"},
- {"id": 49, "ref": "M2.17", "subject": "M2.17 #31: reconcile StateTransitionValidator delete-from-NotDeployed", "class": "small", "status": "pending"},
- {"id": 50, "ref": "M2.18", "subject": "M2.18 #26: debug-stream stream-first ordering + replay/dedup", "class": "high-risk", "status": "pending"},
+ {"id": 46, "ref": "M2.14", "subject": "M2.14 #28: readiness gate checks required cluster singletons", "class": "standard", "status": "completed", "commits": ["253bec5", "6b1cb9e"]},
+ {"id": 47, "ref": "M2.15", "subject": "M2.15 #29: register site active-node purge gate (DI)", "class": "small", "status": "completed", "commits": ["e1ee37e"]},
+ {"id": 48, "ref": "M2.16", "subject": "M2.16 #30: Health Monitoring consumes FailedWriteCount", "class": "small", "status": "completed", "commits": ["d81f747", "c9244d8"]},
+ {"id": 49, "ref": "M2.17", "subject": "M2.17 #31: reconcile StateTransitionValidator delete-from-NotDeployed", "class": "small", "status": "completed", "commits": ["c104356"]},
+ {"id": 50, "ref": "M2.18", "subject": "M2.18 #26: debug-stream stream-first ordering + replay/dedup", "class": "high-risk", "status": "completed", "commits": ["3e31bd8"]},
{"id": 51, "ref": "M2.19", "subject": "M2.19 #15: LDAP periodic re-query for interactive sessions (spike+impl)", "class": "high-risk", "status": "pending"}
],
"deferred": [
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
index 8e0ecb1b..941b38f8 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs
@@ -10,10 +10,24 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Actors;
/// Long-lived (one per active debug session) actor on the central side. Debug sessions
/// are session-based and temporary — this actor holds no persisted state and does not
/// derive from an Akka.Persistence base class; its state does not survive a restart.
-/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
-/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC
-/// server-streaming subscription via SiteStreamGrpcClient for ongoing events.
-/// Stream events are marshalled back to the actor via Self.Tell for thread safety.
+///
+/// Stream-first lifecycle (M2.18, #26). To avoid losing any
+/// / that occurs on
+/// the site during the snapshot-build + network-transit window, the gRPC server-streaming
+/// subscription is opened FIRST (in ), alongside the
+/// SubscribeDebugViewRequest sent to the site via CentralCommunicationActor (with
+/// THIS actor as the Sender). Live events that arrive before the
+/// is delivered are buffered in arrival order .
+/// When the snapshot arrives it is delivered to the consumer, then the buffer is flushed
+/// in order, deduped against the snapshot (an event whose per-entity timestamp is
+/// <= the snapshot's timestamp for the same entity is already reflected → dropped; a
+/// strictly-newer event is delivered; an event for an entity absent from the snapshot is
+/// delivered). After the flush the actor switches to pass-through: subsequent events go
+/// straight to the consumer. A mid-session reconnect (after the snapshot) resumes
+/// pass-through — the snapshot is a one-time thing.
+///
+/// Stream events are marshalled back to the actor via Self.Tell for thread safety; all
+/// state (phase flag + buffer) is mutated only on the actor thread.
///
public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
{
@@ -49,6 +63,31 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
private bool _stopped;
private CancellationTokenSource? _grpcCts;
+ ///
+ /// Phase flag (M2.18). until the initial
+ /// has been delivered and the pre-snapshot buffer
+ /// flushed; thereafter (pass-through). Mutated only on the
+ /// actor thread. A reconnect does NOT touch this flag — a mid-session reconnect
+ /// (after the snapshot) therefore stays in pass-through, and a reconnect during the
+ /// buffering phase (before the snapshot) stays buffering.
+ ///
+ private bool _snapshotDelivered;
+
+ ///
+ /// Ordered buffer of live gRPC events ( /
+ /// ) that arrived before the snapshot was delivered.
+ /// Flushed (with per-entity dedup against the snapshot) when the snapshot arrives,
+ /// then never used again. Mutated only on the actor thread.
+ ///
+ private readonly List _preSnapshotBuffer = new();
+
+ ///
+ /// Defensive log threshold: if the pre-snapshot buffer grows past this many events
+ /// during a slow snapshot we log once (events are NOT dropped — the window is short).
+ ///
+ private const int BufferWarnThreshold = 10_000;
+ private bool _bufferWarned;
+
/// Timer scheduler for reconnect and stability window timers.
public ITimerScheduler Timers { get; set; } = null!;
@@ -87,16 +126,33 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
// Initial snapshot response from the site (via ClusterClient).
// M2.11: if the site reports InstanceNotFound=true the instance is not
- // deployed there. Forward the snapshot (with InstanceNotFound=true) to
- // _onEvent so DebugStreamService's TCS resolves and the caller can
- // inspect the flag; then stop cleanly without opening a gRPC stream.
+ // deployed there. M2.18: under the stream-first lifecycle the gRPC stream
+ // was already opened in PreStart, so the not-found path must tear it down
+ // (CleanupGrpc) rather than enter pass-through. Forward the snapshot (with
+ // InstanceNotFound=true) to _onEvent so DebugStreamService's TCS resolves and
+ // the caller can inspect the flag; then stop cleanly.
Receive(snapshot =>
{
+ if (_snapshotDelivered)
+ {
+ // Defensive: a duplicate / late snapshot after we have already moved to
+ // pass-through. The snapshot is a one-time thing — ignore replays so we
+ // never re-buffer or double-deliver.
+ _log.Debug("Ignoring duplicate DebugViewSnapshot for {0} (already delivered)",
+ _instanceUniqueName);
+ return;
+ }
+
if (snapshot.InstanceNotFound)
{
_log.Warning("Instance {0} is not deployed on site; terminating debug stream",
_instanceUniqueName);
_stopped = true;
+ // M2.18: the stream-first subscription opened in PreStart is for a
+ // non-deployed instance — cancel it (and any buffered gap events are
+ // discarded with the actor). No pass-through.
+ CleanupGrpc();
+ _preSnapshotBuffer.Clear();
_onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true
// Note: after Context.Stop(Self) below the actor is dead. DebugStreamService
// inspects InitialSnapshot.InstanceNotFound and calls StopStream, which sends
@@ -106,10 +162,15 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
return;
}
- _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
- _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
+ _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms); flushing {3} buffered event(s)",
+ _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count,
+ _preSnapshotBuffer.Count);
+
+ // Deliver the snapshot, then flush the gap-window buffer (deduped), then
+ // switch to pass-through. Order matters: snapshot first, buffered events next.
_onEvent(snapshot);
- OpenGrpcStream();
+ FlushBuffer(snapshot);
+ _snapshotDelivered = true;
});
// Domain events arriving via Self.Tell from gRPC callback.
@@ -117,8 +178,11 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
// flapping stream that delivers a single event between failures would
// otherwise never trip MaxRetries. The retry budget is recovered only by
// GrpcStreamStable (a stream that has stayed up for StabilityWindow).
- Receive(changed => _onEvent(changed));
- Receive(changed => _onEvent(changed));
+ // M2.18: before the snapshot has been delivered, BUFFER (in arrival order)
+ // rather than deliver — these may be gap-window events. After the snapshot has
+ // been flushed, pass through directly (same handler, phase-dependent behavior).
+ Receive(changed => HandleStreamEvent(changed));
+ Receive(changed => HandleStreamEvent(changed));
// Stream has been stably connected for StabilityWindow — recover the
// retry budget so a future transient fault gets a fresh set of retries.
@@ -173,11 +237,149 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers
});
}
+ ///
+ /// Handles a live gRPC stream event ( or
+ /// ). Before the snapshot has been delivered the
+ /// event is appended to the ordered pre-snapshot buffer (gap-window capture); after
+ /// the snapshot+flush it is passed straight through to the consumer. Always runs on
+ /// the actor thread (events are marshalled in via Self.Tell), so the phase flag and
+ /// buffer are accessed without locking.
+ ///
+ private void HandleStreamEvent(object evt)
+ {
+ if (_snapshotDelivered)
+ {
+ _onEvent(evt);
+ return;
+ }
+
+ _preSnapshotBuffer.Add(evt);
+ if (!_bufferWarned && _preSnapshotBuffer.Count > BufferWarnThreshold)
+ {
+ _bufferWarned = true;
+ _log.Warning(
+ "Pre-snapshot debug-event buffer for {0} exceeded {1} events while awaiting the snapshot; " +
+ "events are still retained (not dropped).",
+ _instanceUniqueName, BufferWarnThreshold);
+ }
+ }
+
+ ///
+ /// Flushes the pre-snapshot buffer in arrival order, deduping each event against the
+ /// just-delivered snapshot (M2.18).
+ ///
+ /// Dedup rule. Identity is per-entity:
+ /// attributes by (InstanceUniqueName, AttributePath, AttributeName); alarms by
+ /// (InstanceUniqueName, AlarmName, SourceReference). For a buffered event whose entity
+ /// is present in the snapshot, the comparison is against that entity's snapshot
+ /// timestamp: a buffered timestamp <= the snapshot timestamp means the event is
+ /// already reflected in the snapshot → DROP; a strictly-newer (>) timestamp means
+ /// the event happened after the snapshot was built → DELIVER. The boundary is inclusive
+ /// on the snapshot side (equal timestamps are treated as duplicates) — the snapshot is
+ /// the authoritative point-in-time value, so an event at the exact same instant carries
+ /// no new information. A buffered event whose entity is NOT in the snapshot is a genuine
+ /// gap-window event → DELIVER.
+ ///
+ ///
+ private void FlushBuffer(DebugViewSnapshot snapshot)
+ {
+ if (_preSnapshotBuffer.Count == 0) return;
+
+ // Build per-entity "as-of" timestamps from the snapshot. If (defensively) the
+ // snapshot lists the same entity twice, keep the newest timestamp.
+ var attrAsOf = new Dictionary();
+ foreach (var a in snapshot.AttributeValues)
+ {
+ var key = AttributeKey(a);
+ if (!attrAsOf.TryGetValue(key, out var existing) || a.Timestamp > existing)
+ attrAsOf[key] = a.Timestamp;
+ }
+
+ var alarmAsOf = new Dictionary();
+ foreach (var al in snapshot.AlarmStates)
+ {
+ var key = AlarmKey(al);
+ if (!alarmAsOf.TryGetValue(key, out var existing) || al.Timestamp > existing)
+ alarmAsOf[key] = al.Timestamp;
+ }
+
+ var flushed = 0;
+ var dropped = 0;
+ foreach (var evt in _preSnapshotBuffer)
+ {
+ if (IsReflectedInSnapshot(evt, attrAsOf, alarmAsOf))
+ {
+ dropped++;
+ continue;
+ }
+
+ _onEvent(evt);
+ flushed++;
+ }
+
+ if (dropped > 0 || flushed > 0)
+ {
+ _log.Debug("Flushed {0} buffered debug event(s) for {1}, dropped {2} as already-in-snapshot",
+ flushed, _instanceUniqueName, dropped);
+ }
+
+ _preSnapshotBuffer.Clear();
+ }
+
+ ///
+ /// Returns when a buffered event is already reflected in the
+ /// snapshot (same entity, buffered timestamp <= snapshot timestamp) and must be
+ /// dropped; otherwise (deliver).
+ ///
+ private static bool IsReflectedInSnapshot(
+ object evt,
+ IReadOnlyDictionary attrAsOf,
+ IReadOnlyDictionary alarmAsOf)
+ {
+ switch (evt)
+ {
+ case AttributeValueChanged a:
+ return attrAsOf.TryGetValue(AttributeKey(a), out var attrTs) && a.Timestamp <= attrTs;
+ case AlarmStateChanged al:
+ return alarmAsOf.TryGetValue(AlarmKey(al), out var alarmTs) && al.Timestamp <= alarmTs;
+ default:
+ // Unknown buffered type (should not happen — only attr/alarm are buffered):
+ // never treat as a duplicate.
+ return false;
+ }
+ }
+
+ ///
+ /// Delimiter used to join identity components into a single dedup key. A NUL
+ /// control character cannot appear in an instance/attribute/alarm name, so
+ /// distinct identities never collide on a shared boundary (unlike a space, which
+ /// may legitimately occur within a name). Declared as an escaped char so the
+ /// source carries no raw NUL byte.
+ ///
+ private const char KeyDelimiter = '\u0000';
+
+ /// Per-entity dedup key for an attribute change.
+ private static string AttributeKey(AttributeValueChanged a) =>
+ string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName);
+
+ ///
+ /// Per-entity dedup key for an alarm change. Includes
+ /// so native per-condition alarms (which share an AlarmName but differ by source
+ /// reference) are not conflated; empty for computed alarms.
+ ///
+ private static string AlarmKey(AlarmStateChanged al) =>
+ string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference);
+
///
protected override void PreStart()
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
+ // M2.18 stream-first: open the gRPC live-event subscription BEFORE (and
+ // alongside) requesting the snapshot, so events occurring during the
+ // snapshot-build + network-transit window are captured (buffered) and not lost.
+ OpenGrpcStream();
+
// Send subscribe request via CentralCommunicationActor for the initial snapshot.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
index bfbdf338..619b05c5 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs
@@ -61,16 +61,22 @@ public class DebugStreamBridgeActorTests : TestKit
}
[Fact]
- public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Does_Not_Open_Stream_And_Terminates()
+ public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Tears_Down_Stream_And_Terminates()
{
- // M2.11: when the site reports InstanceNotFound=true the bridge actor must
+ // M2.11 (revised for M2.18 stream-first): the gRPC subscription is now opened
+ // up-front in PreStart, so when the site reports InstanceNotFound=true the
+ // bridge actor must
// (a) forward the not-found snapshot to _onEvent so DebugStreamService's TCS
// resolves and the caller can inspect the flag,
- // (b) NOT open a gRPC stream (SubscribeCalls must remain empty), and
+ // (b) tear DOWN the already-opened gRPC stream (Unsubscribe the just-opened
+ // correlation) rather than enter pass-through, and
// (c) stop itself cleanly.
var ctx = CreateBridgeActor();
ctx.CommProbe.ExpectMsg(); // initial subscribe envelope
+ // Stream-first: the gRPC subscription is opened before the snapshot arrives.
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
var notFoundSnapshot = new DebugViewSnapshot(
InstanceName,
new List(),
@@ -90,12 +96,12 @@ public class DebugStreamBridgeActorTests : TestKit
Assert.True(received.InstanceNotFound);
}
- // (b) no gRPC stream opened
- ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
- Assert.Empty(ctx.MockGrpcClient.SubscribeCalls);
+ // (b) the just-opened gRPC stream is torn down (not left running / no pass-through)
+ AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
+ TimeSpan.FromSeconds(3));
// (c) actor terminates cleanly
- // ExpectTerminated above already verified termination
+ ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
}
[Fact]
@@ -386,6 +392,369 @@ public class DebugStreamBridgeActorTests : TestKit
Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId);
}
+ // ---------------------------------------------------------------------
+ // M2.18 (#26) — stream-first + replay/dedup
+ // ---------------------------------------------------------------------
+
+ [Fact]
+ public void PreStart_Opens_GrpcStream_Before_Snapshot_Arrives()
+ {
+ // M2.18: the gRPC subscription must be opened in PreStart (stream-first),
+ // BEFORE the snapshot is delivered, so live events start flowing during the
+ // snapshot-build + network-transit window. The old lifecycle opened the
+ // stream only after the snapshot arrived, losing gap-window events.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg(); // initial subscribe envelope
+
+ // No snapshot sent yet — the stream must already be open.
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+ Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[0].CorrelationId);
+ Assert.Equal(InstanceName, ctx.MockGrpcClient.SubscribeCalls[0].InstanceUniqueName);
+
+ // _onEvent must NOT have fired — buffering, not delivering.
+ lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
+ }
+
+ [Fact]
+ public void GapWindow_Event_Buffered_Before_Snapshot_Is_Delivered_Exactly_Once_After_Snapshot()
+ {
+ // M2.18: an event arriving DURING the snapshot window (before the snapshot
+ // is delivered) is buffered, then flushed exactly once AFTER the snapshot.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ // Live event arrives BEFORE the snapshot — its entity is NOT in the snapshot,
+ // so it is a genuine gap-window event that must survive.
+ var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Pressure", 99.9, "Good",
+ DateTimeOffset.UtcNow);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(gapEvent);
+
+ // While buffering, _onEvent has not fired.
+ lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
+
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ DateTimeOffset.UtcNow);
+ ctx.BridgeActor.Tell(snapshot);
+
+ // snapshot then the buffered gap-window event, exactly once, in that order.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
+ Assert.Equal("Pressure", flushed.AttributeName);
+ }
+ }
+
+ [Fact]
+ public void Buffered_Event_Already_Reflected_In_Snapshot_Is_Dropped()
+ {
+ // M2.18 dedup: a buffered event whose entity is in the snapshot with an equal
+ // or newer snapshot timestamp (buffered.Timestamp <= snapshot.Timestamp) is
+ // already reflected and must be DROPPED.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var t0 = DateTimeOffset.UtcNow;
+
+ // Buffered event for "Temp" at t0.
+ var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
+
+ // Snapshot already contains "Temp" at the SAME timestamp t0 → buffered is a dup.
+ var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0);
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List { snapAttr },
+ new List(),
+ t0);
+ ctx.BridgeActor.Tell(snapshot);
+
+ // Only the snapshot is delivered; the buffered duplicate is dropped.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
+ TimeSpan.FromSeconds(3));
+ // Give a beat to ensure no extra (dropped) event sneaks through.
+ Thread.Sleep(200);
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.Single(ctx.ReceivedEvents);
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ }
+ }
+
+ [Fact]
+ public void Buffered_Event_Strictly_Newer_Than_Snapshot_Entity_Is_Delivered()
+ {
+ // M2.18 dedup: a buffered event strictly newer than the snapshot's entry for
+ // the same entity (buffered.Timestamp > snapshot.Timestamp) is NOT a dup and
+ // must be DELIVERED after the snapshot.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var snapTime = DateTimeOffset.UtcNow;
+ var newerTime = snapTime.AddMilliseconds(1);
+
+ // Buffered event for "Temp" strictly NEWER than the snapshot's "Temp".
+ var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 50.0, "Good", newerTime);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered);
+
+ var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", snapTime);
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List { snapAttr },
+ new List(),
+ snapTime);
+ ctx.BridgeActor.Tell(snapshot);
+
+ // snapshot then the strictly-newer buffered event.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
+ Assert.Equal(50.0, flushed.Value);
+ Assert.Equal(newerTime, flushed.Timestamp);
+ }
+ }
+
+ [Fact]
+ public void Buffered_Alarm_Dedup_Uses_AlarmIdentity_And_Timestamp()
+ {
+ // M2.18 dedup for alarms: identity = (instance, alarm name, source reference).
+ // A buffered alarm older-or-equal to the snapshot's same-identity alarm is
+ // dropped; a strictly-newer one is delivered.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var t0 = DateTimeOffset.UtcNow;
+
+ // Buffered: "PumpFault" at t0 (dup) and "Overheat" at t0+1ms (newer, deliver).
+ var dupAlarm = new AlarmStateChanged(InstanceName, "PumpFault",
+ ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
+ var newerAlarm = new AlarmStateChanged(InstanceName, "Overheat",
+ ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 700, t0.AddMilliseconds(1));
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(dupAlarm);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(newerAlarm);
+
+ // Snapshot contains BOTH "PumpFault" and "Overheat" at t0.
+ var snapPumpFault = new AlarmStateChanged(InstanceName, "PumpFault",
+ ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0);
+ var snapOverheat = new AlarmStateChanged(InstanceName, "Overheat",
+ ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Normal, 0, t0);
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List { snapPumpFault, snapOverheat },
+ t0);
+ ctx.BridgeActor.Tell(snapshot);
+
+ // snapshot + only the strictly-newer "Overheat" alarm (PumpFault dropped).
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ Thread.Sleep(200);
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.Equal(2, ctx.ReceivedEvents.Count);
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ var flushed = Assert.IsType(ctx.ReceivedEvents[1]);
+ Assert.Equal("Overheat", flushed.AlarmName);
+ Assert.Equal(700, flushed.Priority);
+ }
+ }
+
+ [Fact]
+ public void Buffered_Events_Flushed_In_Arrival_Order()
+ {
+ // M2.18: ordering preserved across multiple buffered events (none are dups —
+ // their entities are absent from the snapshot).
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var baseTime = DateTimeOffset.UtcNow;
+ var sub = ctx.MockGrpcClient.SubscribeCalls[0];
+ sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "A", 1, "Good", baseTime));
+ sub.OnEvent(new AlarmStateChanged(InstanceName, "AlarmX",
+ ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 100, baseTime));
+ sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "B", 2, "Good", baseTime));
+
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ baseTime);
+ ctx.BridgeActor.Tell(snapshot);
+
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 4; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ Assert.Equal("A", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
+ Assert.Equal("AlarmX", Assert.IsType(ctx.ReceivedEvents[2]).AlarmName);
+ Assert.Equal("B", Assert.IsType(ctx.ReceivedEvents[3]).AttributeName);
+ }
+ }
+
+ [Fact]
+ public void PassThrough_After_Flush_Delivers_Subsequent_Events_Immediately()
+ {
+ // M2.18: after the snapshot+flush the actor switches to pass-through — later
+ // events go straight to _onEvent (no buffering, no dup).
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ DateTimeOffset.UtcNow);
+ ctx.BridgeActor.Tell(snapshot);
+
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
+ TimeSpan.FromSeconds(3));
+
+ // Post-snapshot event — must be delivered immediately, exactly once.
+ var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good",
+ DateTimeOffset.UtcNow);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(postEvent);
+
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.IsType(ctx.ReceivedEvents[1]);
+ }
+ }
+
+ [Fact]
+ public void InstanceNotFound_After_StreamFirst_Tears_Down_Stream_And_Does_Not_PassThrough()
+ {
+ // M2.18 + M2.11: stream-first means the gRPC subscription is already open
+ // when an InstanceNotFound snapshot arrives. The bridge must tear that stream
+ // down (Unsubscribe the just-opened correlation), deliver the not-found
+ // snapshot, NOT enter pass-through, and stop cleanly.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+
+ // Stream opened up-front (stream-first).
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var notFoundSnapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ DateTimeOffset.UtcNow,
+ InstanceNotFound: true);
+
+ Watch(ctx.BridgeActor);
+ ctx.BridgeActor.Tell(notFoundSnapshot);
+
+ // Not-found snapshot delivered.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.True(Assert.IsType(ctx.ReceivedEvents[0]).InstanceNotFound);
+ }
+
+ // The just-opened stream must be torn down.
+ AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"),
+ TimeSpan.FromSeconds(3));
+
+ // Stops cleanly.
+ ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3));
+
+ // No pass-through: an event arriving after the stop is not delivered.
+ var late = new AttributeValueChanged(InstanceName, "IO", "Temp", 1, "Good", DateTimeOffset.UtcNow);
+ ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(late);
+ Thread.Sleep(200);
+ lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); }
+ }
+
+ [Fact]
+ public void Reconnect_During_Buffering_Phase_Keeps_Buffering_Until_Snapshot()
+ {
+ // M2.18: a gRPC error/reconnect BEFORE the snapshot arrives must remain in the
+ // buffering phase — events on the new stream are still buffered, then flushed
+ // when the snapshot finally arrives.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ // Error before snapshot → reconnect (still buffering).
+ ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("pre-snapshot blip"));
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
+
+ // Event on the reconnected stream — still buffered (snapshot not yet delivered).
+ var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Late", 7, "Good",
+ DateTimeOffset.UtcNow);
+ ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(gapEvent);
+ lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); }
+
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ DateTimeOffset.UtcNow);
+ ctx.BridgeActor.Tell(snapshot);
+
+ // snapshot + the event buffered across the reconnect.
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.IsType(ctx.ReceivedEvents[0]);
+ Assert.Equal("Late", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
+ }
+ }
+
+ [Fact]
+ public void Reconnect_After_Snapshot_Resumes_PassThrough_Not_Buffering()
+ {
+ // M2.18: a mid-session reconnect (after the snapshot was already delivered)
+ // must resume pass-through — the snapshot is a one-time thing and events on
+ // the reconnected stream are delivered immediately, not re-buffered.
+ var ctx = CreateBridgeActor();
+ ctx.CommProbe.ExpectMsg();
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3));
+
+ var snapshot = new DebugViewSnapshot(
+ InstanceName,
+ new List(),
+ new List(),
+ DateTimeOffset.UtcNow);
+ ctx.BridgeActor.Tell(snapshot);
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } },
+ TimeSpan.FromSeconds(3));
+
+ // Mid-session reconnect.
+ ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("mid-session blip"));
+ AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5));
+
+ // Event on the reconnected stream — delivered immediately (pass-through).
+ var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 9, "Good",
+ DateTimeOffset.UtcNow);
+ ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(postEvent);
+
+ AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } },
+ TimeSpan.FromSeconds(3));
+ lock (ctx.ReceivedEvents)
+ {
+ Assert.Equal("Temp", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName);
+ }
+ }
+
[Fact]
public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow()
{