From d8519cb464291e6653c1c2593d8ed9e04efaf115 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 07:33:26 -0400 Subject: [PATCH] fix(debug-stream): stream-first lifecycle with replay/dedup (#26, M2.18) Re-architect DebugStreamBridgeActor from snapshot-first to stream-first so no attribute/alarm event occurring during the snapshot-build + network-transit window is lost (#26). Lifecycle change: - PreStart now opens the gRPC subscription FIRST (alongside sending the SubscribeDebugViewRequest), so live events start flowing immediately. - Phase model via a single _snapshotDelivered flag (mutated only on the actor thread). While buffering (snapshot not yet delivered), AttributeValueChanged/ AlarmStateChanged are appended to an ordered _preSnapshotBuffer instead of being delivered. After snapshot+flush, the same handlers pass through directly. - On DebugViewSnapshot: deliver snapshot, then flush the buffer in arrival order with per-entity dedup, then set _snapshotDelivered=true (pass-through). Dedup rule (exactly-once): - Identity: attributes by (InstanceUniqueName, AttributePath, AttributeName); alarms by (InstanceUniqueName, AlarmName, SourceReference) so native per-condition alarms are not conflated. Keys joined with a NUL delimiter (declared as an escaped char constant; no raw NUL in source) so distinct identities never collide on a space within a name. - Boundary: a buffered event whose timestamp is <= the snapshot's timestamp for the same entity is already reflected -> DROP; strictly-newer (>) -> DELIVER; entity absent from the snapshot -> DELIVER (genuine gap-window event). Preserved paths: - M2.11 InstanceNotFound: with stream-first the gRPC stream is already open, so the not-found path now tears it down (CleanupGrpc) + clears the buffer, does NOT enter pass-through, delivers the not-found snapshot, and stops cleanly. - Reconnect (ReconnectGrpcStream -> OpenGrpcStream) does not touch the phase flag: a mid-session reconnect resumes pass-through; a reconnect during the buffering phase stays buffering until the snapshot arrives. - Communication-008 retry/stability/stop/terminate + ReceiveTimeout orphan net unchanged. Duplicate/late snapshot after delivery is ignored defensively. Tests: 10 new M2.18 tests (stream-first ordering, gap-window buffering, dedup drop/deliver for attrs + alarms, ordering, pass-through, InstanceNotFound teardown, reconnect-during-buffering, reconnect-after-snapshot) + revised the M2.11 not-found test to assert stream teardown. Full DebugStreamBridgeActor class green: 23/23. --- ...illpending-m2-implementation.md.tasks.json | 10 +- .../Actors/DebugStreamBridgeActor.cs | 226 ++++++++++- .../Grpc/DebugStreamBridgeActorTests.cs | 383 +++++++++++++++++- 3 files changed, 595 insertions(+), 24 deletions(-) diff --git a/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json b/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json index f20e8fe8..2ab026f3 100644 --- a/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json +++ b/docs/plans/2026-06-15-stillpending-m2-implementation.md.tasks.json @@ -15,11 +15,11 @@ {"id": 43, "ref": "M2.11", "subject": "M2.11 #24: debug snapshot unknown-instance returns error", "class": "small", "status": "completed", "commits": ["dbf44b9", "d160c7f"]}, {"id": 44, "ref": "M2.12", "subject": "M2.12 #25: recursion-limit error to site event log", "class": "small", "status": "completed", "commits": ["f08038d", "e2b31a9"]}, {"id": 45, "ref": "M2.13", "subject": "M2.13 #27: populate obtainable OPC UA/MxGateway transition fields", "class": "small", "status": "completed", "commits": ["722b866", "3945789"]}, - {"id": 46, "ref": "M2.14", "subject": "M2.14 #28: readiness gate checks required cluster singletons", "class": "standard", "status": "completed", "commits": ["253bec5"]}, - {"id": 47, "ref": "M2.15", "subject": "M2.15 #29: register site active-node purge gate (DI)", "class": "small", "status": "pending"}, - {"id": 48, "ref": "M2.16", "subject": "M2.16 #30: Health Monitoring consumes FailedWriteCount", "class": "small", "status": "pending"}, - {"id": 49, "ref": "M2.17", "subject": "M2.17 #31: reconcile StateTransitionValidator delete-from-NotDeployed", "class": "small", "status": "pending"}, - {"id": 50, "ref": "M2.18", "subject": "M2.18 #26: debug-stream stream-first ordering + replay/dedup", "class": "high-risk", "status": "pending"}, + {"id": 46, "ref": "M2.14", "subject": "M2.14 #28: readiness gate checks required cluster singletons", "class": "standard", "status": "completed", "commits": ["253bec5", "6b1cb9e"]}, + {"id": 47, "ref": "M2.15", "subject": "M2.15 #29: register site active-node purge gate (DI)", "class": "small", "status": "completed", "commits": ["e1ee37e"]}, + {"id": 48, "ref": "M2.16", "subject": "M2.16 #30: Health Monitoring consumes FailedWriteCount", "class": "small", "status": "completed", "commits": ["d81f747", "c9244d8"]}, + {"id": 49, "ref": "M2.17", "subject": "M2.17 #31: reconcile StateTransitionValidator delete-from-NotDeployed", "class": "small", "status": "completed", "commits": ["c104356"]}, + {"id": 50, "ref": "M2.18", "subject": "M2.18 #26: debug-stream stream-first ordering + replay/dedup", "class": "high-risk", "status": "completed", "commits": ["3e31bd8"]}, {"id": 51, "ref": "M2.19", "subject": "M2.19 #15: LDAP periodic re-query for interactive sessions (spike+impl)", "class": "high-risk", "status": "pending"} ], "deferred": [ diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs index 8e0ecb1b..941b38f8 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/DebugStreamBridgeActor.cs @@ -10,10 +10,24 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Actors; /// Long-lived (one per active debug session) actor on the central side. Debug sessions /// are session-based and temporary — this actor holds no persisted state and does not /// derive from an Akka.Persistence base class; its state does not survive a restart. -/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor -/// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC -/// server-streaming subscription via SiteStreamGrpcClient for ongoing events. -/// Stream events are marshalled back to the actor via Self.Tell for thread safety. +/// +/// Stream-first lifecycle (M2.18, #26). To avoid losing any +/// / that occurs on +/// the site during the snapshot-build + network-transit window, the gRPC server-streaming +/// subscription is opened FIRST (in ), alongside the +/// SubscribeDebugViewRequest sent to the site via CentralCommunicationActor (with +/// THIS actor as the Sender). Live events that arrive before the +/// is delivered are buffered in arrival order. +/// When the snapshot arrives it is delivered to the consumer, then the buffer is flushed +/// in order, deduped against the snapshot (an event whose per-entity timestamp is +/// <= the snapshot's timestamp for the same entity is already reflected → dropped; a +/// strictly-newer event is delivered; an event for an entity absent from the snapshot is +/// delivered). After the flush the actor switches to pass-through: subsequent events go +/// straight to the consumer. A mid-session reconnect (after the snapshot) resumes +/// pass-through — the snapshot is a one-time thing. +/// +/// Stream events are marshalled back to the actor via Self.Tell for thread safety; all +/// state (phase flag + buffer) is mutated only on the actor thread. /// public class DebugStreamBridgeActor : ReceiveActor, IWithTimers { @@ -49,6 +63,31 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers private bool _stopped; private CancellationTokenSource? _grpcCts; + /// + /// Phase flag (M2.18). until the initial + /// has been delivered and the pre-snapshot buffer + /// flushed; thereafter (pass-through). Mutated only on the + /// actor thread. A reconnect does NOT touch this flag — a mid-session reconnect + /// (after the snapshot) therefore stays in pass-through, and a reconnect during the + /// buffering phase (before the snapshot) stays buffering. + /// + private bool _snapshotDelivered; + + /// + /// Ordered buffer of live gRPC events (/ + /// ) that arrived before the snapshot was delivered. + /// Flushed (with per-entity dedup against the snapshot) when the snapshot arrives, + /// then never used again. Mutated only on the actor thread. + /// + private readonly List _preSnapshotBuffer = new(); + + /// + /// Defensive log threshold: if the pre-snapshot buffer grows past this many events + /// during a slow snapshot we log once (events are NOT dropped — the window is short). + /// + private const int BufferWarnThreshold = 10_000; + private bool _bufferWarned; + /// Timer scheduler for reconnect and stability window timers. public ITimerScheduler Timers { get; set; } = null!; @@ -87,16 +126,33 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers // Initial snapshot response from the site (via ClusterClient). // M2.11: if the site reports InstanceNotFound=true the instance is not - // deployed there. Forward the snapshot (with InstanceNotFound=true) to - // _onEvent so DebugStreamService's TCS resolves and the caller can - // inspect the flag; then stop cleanly without opening a gRPC stream. + // deployed there. M2.18: under the stream-first lifecycle the gRPC stream + // was already opened in PreStart, so the not-found path must tear it down + // (CleanupGrpc) rather than enter pass-through. Forward the snapshot (with + // InstanceNotFound=true) to _onEvent so DebugStreamService's TCS resolves and + // the caller can inspect the flag; then stop cleanly. Receive(snapshot => { + if (_snapshotDelivered) + { + // Defensive: a duplicate / late snapshot after we have already moved to + // pass-through. The snapshot is a one-time thing — ignore replays so we + // never re-buffer or double-deliver. + _log.Debug("Ignoring duplicate DebugViewSnapshot for {0} (already delivered)", + _instanceUniqueName); + return; + } + if (snapshot.InstanceNotFound) { _log.Warning("Instance {0} is not deployed on site; terminating debug stream", _instanceUniqueName); _stopped = true; + // M2.18: the stream-first subscription opened in PreStart is for a + // non-deployed instance — cancel it (and any buffered gap events are + // discarded with the actor). No pass-through. + CleanupGrpc(); + _preSnapshotBuffer.Clear(); _onEvent(snapshot); // resolves the snapshot TCS with InstanceNotFound=true // Note: after Context.Stop(Self) below the actor is dead. DebugStreamService // inspects InitialSnapshot.InstanceNotFound and calls StopStream, which sends @@ -106,10 +162,15 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers return; } - _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)", - _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count); + _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms); flushing {3} buffered event(s)", + _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count, + _preSnapshotBuffer.Count); + + // Deliver the snapshot, then flush the gap-window buffer (deduped), then + // switch to pass-through. Order matters: snapshot first, buffered events next. _onEvent(snapshot); - OpenGrpcStream(); + FlushBuffer(snapshot); + _snapshotDelivered = true; }); // Domain events arriving via Self.Tell from gRPC callback. @@ -117,8 +178,11 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers // flapping stream that delivers a single event between failures would // otherwise never trip MaxRetries. The retry budget is recovered only by // GrpcStreamStable (a stream that has stayed up for StabilityWindow). - Receive(changed => _onEvent(changed)); - Receive(changed => _onEvent(changed)); + // M2.18: before the snapshot has been delivered, BUFFER (in arrival order) + // rather than deliver — these may be gap-window events. After the snapshot has + // been flushed, pass through directly (same handler, phase-dependent behavior). + Receive(changed => HandleStreamEvent(changed)); + Receive(changed => HandleStreamEvent(changed)); // Stream has been stably connected for StabilityWindow — recover the // retry budget so a future transient fault gets a fresh set of retries. @@ -173,11 +237,149 @@ public class DebugStreamBridgeActor : ReceiveActor, IWithTimers }); } + /// + /// Handles a live gRPC stream event ( or + /// ). Before the snapshot has been delivered the + /// event is appended to the ordered pre-snapshot buffer (gap-window capture); after + /// the snapshot+flush it is passed straight through to the consumer. Always runs on + /// the actor thread (events are marshalled in via Self.Tell), so the phase flag and + /// buffer are accessed without locking. + /// + private void HandleStreamEvent(object evt) + { + if (_snapshotDelivered) + { + _onEvent(evt); + return; + } + + _preSnapshotBuffer.Add(evt); + if (!_bufferWarned && _preSnapshotBuffer.Count > BufferWarnThreshold) + { + _bufferWarned = true; + _log.Warning( + "Pre-snapshot debug-event buffer for {0} exceeded {1} events while awaiting the snapshot; " + + "events are still retained (not dropped).", + _instanceUniqueName, BufferWarnThreshold); + } + } + + /// + /// Flushes the pre-snapshot buffer in arrival order, deduping each event against the + /// just-delivered snapshot (M2.18). + /// + /// Dedup rule. Identity is per-entity: + /// attributes by (InstanceUniqueName, AttributePath, AttributeName); alarms by + /// (InstanceUniqueName, AlarmName, SourceReference). For a buffered event whose entity + /// is present in the snapshot, the comparison is against that entity's snapshot + /// timestamp: a buffered timestamp <= the snapshot timestamp means the event is + /// already reflected in the snapshot → DROP; a strictly-newer (>) timestamp means + /// the event happened after the snapshot was built → DELIVER. The boundary is inclusive + /// on the snapshot side (equal timestamps are treated as duplicates) — the snapshot is + /// the authoritative point-in-time value, so an event at the exact same instant carries + /// no new information. A buffered event whose entity is NOT in the snapshot is a genuine + /// gap-window event → DELIVER. + /// + /// + private void FlushBuffer(DebugViewSnapshot snapshot) + { + if (_preSnapshotBuffer.Count == 0) return; + + // Build per-entity "as-of" timestamps from the snapshot. If (defensively) the + // snapshot lists the same entity twice, keep the newest timestamp. + var attrAsOf = new Dictionary(); + foreach (var a in snapshot.AttributeValues) + { + var key = AttributeKey(a); + if (!attrAsOf.TryGetValue(key, out var existing) || a.Timestamp > existing) + attrAsOf[key] = a.Timestamp; + } + + var alarmAsOf = new Dictionary(); + foreach (var al in snapshot.AlarmStates) + { + var key = AlarmKey(al); + if (!alarmAsOf.TryGetValue(key, out var existing) || al.Timestamp > existing) + alarmAsOf[key] = al.Timestamp; + } + + var flushed = 0; + var dropped = 0; + foreach (var evt in _preSnapshotBuffer) + { + if (IsReflectedInSnapshot(evt, attrAsOf, alarmAsOf)) + { + dropped++; + continue; + } + + _onEvent(evt); + flushed++; + } + + if (dropped > 0 || flushed > 0) + { + _log.Debug("Flushed {0} buffered debug event(s) for {1}, dropped {2} as already-in-snapshot", + flushed, _instanceUniqueName, dropped); + } + + _preSnapshotBuffer.Clear(); + } + + /// + /// Returns when a buffered event is already reflected in the + /// snapshot (same entity, buffered timestamp <= snapshot timestamp) and must be + /// dropped; otherwise (deliver). + /// + private static bool IsReflectedInSnapshot( + object evt, + IReadOnlyDictionary attrAsOf, + IReadOnlyDictionary alarmAsOf) + { + switch (evt) + { + case AttributeValueChanged a: + return attrAsOf.TryGetValue(AttributeKey(a), out var attrTs) && a.Timestamp <= attrTs; + case AlarmStateChanged al: + return alarmAsOf.TryGetValue(AlarmKey(al), out var alarmTs) && al.Timestamp <= alarmTs; + default: + // Unknown buffered type (should not happen — only attr/alarm are buffered): + // never treat as a duplicate. + return false; + } + } + + /// + /// Delimiter used to join identity components into a single dedup key. A NUL + /// control character cannot appear in an instance/attribute/alarm name, so + /// distinct identities never collide on a shared boundary (unlike a space, which + /// may legitimately occur within a name). Declared as an escaped char so the + /// source carries no raw NUL byte. + /// + private const char KeyDelimiter = '\u0000'; + + /// Per-entity dedup key for an attribute change. + private static string AttributeKey(AttributeValueChanged a) => + string.Concat(a.InstanceUniqueName, KeyDelimiter, a.AttributePath, KeyDelimiter, a.AttributeName); + + /// + /// Per-entity dedup key for an alarm change. Includes + /// so native per-condition alarms (which share an AlarmName but differ by source + /// reference) are not conflated; empty for computed alarms. + /// + private static string AlarmKey(AlarmStateChanged al) => + string.Concat(al.InstanceUniqueName, KeyDelimiter, al.AlarmName, KeyDelimiter, al.SourceReference); + /// protected override void PreStart() { _log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier); + // M2.18 stream-first: open the gRPC live-event subscription BEFORE (and + // alongside) requesting the snapshot, so events occurring during the + // snapshot-build + network-transit window are captured (buffered) and not lost. + OpenGrpcStream(); + // Send subscribe request via CentralCommunicationActor for the initial snapshot. var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId); var envelope = new SiteEnvelope(_siteIdentifier, request); diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs index bfbdf338..619b05c5 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/DebugStreamBridgeActorTests.cs @@ -61,16 +61,22 @@ public class DebugStreamBridgeActorTests : TestKit } [Fact] - public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Does_Not_Open_Stream_And_Terminates() + public void On_InstanceNotFound_Snapshot_Forwards_To_OnEvent_Tears_Down_Stream_And_Terminates() { - // M2.11: when the site reports InstanceNotFound=true the bridge actor must + // M2.11 (revised for M2.18 stream-first): the gRPC subscription is now opened + // up-front in PreStart, so when the site reports InstanceNotFound=true the + // bridge actor must // (a) forward the not-found snapshot to _onEvent so DebugStreamService's TCS // resolves and the caller can inspect the flag, - // (b) NOT open a gRPC stream (SubscribeCalls must remain empty), and + // (b) tear DOWN the already-opened gRPC stream (Unsubscribe the just-opened + // correlation) rather than enter pass-through, and // (c) stop itself cleanly. var ctx = CreateBridgeActor(); ctx.CommProbe.ExpectMsg(); // initial subscribe envelope + // Stream-first: the gRPC subscription is opened before the snapshot arrives. + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + var notFoundSnapshot = new DebugViewSnapshot( InstanceName, new List(), @@ -90,12 +96,12 @@ public class DebugStreamBridgeActorTests : TestKit Assert.True(received.InstanceNotFound); } - // (b) no gRPC stream opened - ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3)); - Assert.Empty(ctx.MockGrpcClient.SubscribeCalls); + // (b) the just-opened gRPC stream is torn down (not left running / no pass-through) + AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"), + TimeSpan.FromSeconds(3)); // (c) actor terminates cleanly - // ExpectTerminated above already verified termination + ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3)); } [Fact] @@ -386,6 +392,369 @@ public class DebugStreamBridgeActorTests : TestKit Assert.Equal("corr-1", factory.ClientFor(GrpcNodeB).SubscribeCalls[0].CorrelationId); } + // --------------------------------------------------------------------- + // M2.18 (#26) — stream-first + replay/dedup + // --------------------------------------------------------------------- + + [Fact] + public void PreStart_Opens_GrpcStream_Before_Snapshot_Arrives() + { + // M2.18: the gRPC subscription must be opened in PreStart (stream-first), + // BEFORE the snapshot is delivered, so live events start flowing during the + // snapshot-build + network-transit window. The old lifecycle opened the + // stream only after the snapshot arrived, losing gap-window events. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); // initial subscribe envelope + + // No snapshot sent yet — the stream must already be open. + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + Assert.Equal("corr-1", ctx.MockGrpcClient.SubscribeCalls[0].CorrelationId); + Assert.Equal(InstanceName, ctx.MockGrpcClient.SubscribeCalls[0].InstanceUniqueName); + + // _onEvent must NOT have fired — buffering, not delivering. + lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } + } + + [Fact] + public void GapWindow_Event_Buffered_Before_Snapshot_Is_Delivered_Exactly_Once_After_Snapshot() + { + // M2.18: an event arriving DURING the snapshot window (before the snapshot + // is delivered) is buffered, then flushed exactly once AFTER the snapshot. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Live event arrives BEFORE the snapshot — its entity is NOT in the snapshot, + // so it is a genuine gap-window event that must survive. + var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Pressure", 99.9, "Good", + DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(gapEvent); + + // While buffering, _onEvent has not fired. + lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + ctx.BridgeActor.Tell(snapshot); + + // snapshot then the buffered gap-window event, exactly once, in that order. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.IsType(ctx.ReceivedEvents[0]); + var flushed = Assert.IsType(ctx.ReceivedEvents[1]); + Assert.Equal("Pressure", flushed.AttributeName); + } + } + + [Fact] + public void Buffered_Event_Already_Reflected_In_Snapshot_Is_Dropped() + { + // M2.18 dedup: a buffered event whose entity is in the snapshot with an equal + // or newer snapshot timestamp (buffered.Timestamp <= snapshot.Timestamp) is + // already reflected and must be DROPPED. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var t0 = DateTimeOffset.UtcNow; + + // Buffered event for "Temp" at t0. + var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered); + + // Snapshot already contains "Temp" at the SAME timestamp t0 → buffered is a dup. + var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", t0); + var snapshot = new DebugViewSnapshot( + InstanceName, + new List { snapAttr }, + new List(), + t0); + ctx.BridgeActor.Tell(snapshot); + + // Only the snapshot is delivered; the buffered duplicate is dropped. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + // Give a beat to ensure no extra (dropped) event sneaks through. + Thread.Sleep(200); + lock (ctx.ReceivedEvents) + { + Assert.Single(ctx.ReceivedEvents); + Assert.IsType(ctx.ReceivedEvents[0]); + } + } + + [Fact] + public void Buffered_Event_Strictly_Newer_Than_Snapshot_Entity_Is_Delivered() + { + // M2.18 dedup: a buffered event strictly newer than the snapshot's entry for + // the same entity (buffered.Timestamp > snapshot.Timestamp) is NOT a dup and + // must be DELIVERED after the snapshot. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var snapTime = DateTimeOffset.UtcNow; + var newerTime = snapTime.AddMilliseconds(1); + + // Buffered event for "Temp" strictly NEWER than the snapshot's "Temp". + var buffered = new AttributeValueChanged(InstanceName, "IO", "Temp", 50.0, "Good", newerTime); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(buffered); + + var snapAttr = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", snapTime); + var snapshot = new DebugViewSnapshot( + InstanceName, + new List { snapAttr }, + new List(), + snapTime); + ctx.BridgeActor.Tell(snapshot); + + // snapshot then the strictly-newer buffered event. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.IsType(ctx.ReceivedEvents[0]); + var flushed = Assert.IsType(ctx.ReceivedEvents[1]); + Assert.Equal(50.0, flushed.Value); + Assert.Equal(newerTime, flushed.Timestamp); + } + } + + [Fact] + public void Buffered_Alarm_Dedup_Uses_AlarmIdentity_And_Timestamp() + { + // M2.18 dedup for alarms: identity = (instance, alarm name, source reference). + // A buffered alarm older-or-equal to the snapshot's same-identity alarm is + // dropped; a strictly-newer one is delivered. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var t0 = DateTimeOffset.UtcNow; + + // Buffered: "PumpFault" at t0 (dup) and "Overheat" at t0+1ms (newer, deliver). + var dupAlarm = new AlarmStateChanged(InstanceName, "PumpFault", + ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0); + var newerAlarm = new AlarmStateChanged(InstanceName, "Overheat", + ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 700, t0.AddMilliseconds(1)); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(dupAlarm); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(newerAlarm); + + // Snapshot contains BOTH "PumpFault" and "Overheat" at t0. + var snapPumpFault = new AlarmStateChanged(InstanceName, "PumpFault", + ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 500, t0); + var snapOverheat = new AlarmStateChanged(InstanceName, "Overheat", + ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Normal, 0, t0); + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List { snapPumpFault, snapOverheat }, + t0); + ctx.BridgeActor.Tell(snapshot); + + // snapshot + only the strictly-newer "Overheat" alarm (PumpFault dropped). + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + Thread.Sleep(200); + lock (ctx.ReceivedEvents) + { + Assert.Equal(2, ctx.ReceivedEvents.Count); + Assert.IsType(ctx.ReceivedEvents[0]); + var flushed = Assert.IsType(ctx.ReceivedEvents[1]); + Assert.Equal("Overheat", flushed.AlarmName); + Assert.Equal(700, flushed.Priority); + } + } + + [Fact] + public void Buffered_Events_Flushed_In_Arrival_Order() + { + // M2.18: ordering preserved across multiple buffered events (none are dups — + // their entities are absent from the snapshot). + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var baseTime = DateTimeOffset.UtcNow; + var sub = ctx.MockGrpcClient.SubscribeCalls[0]; + sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "A", 1, "Good", baseTime)); + sub.OnEvent(new AlarmStateChanged(InstanceName, "AlarmX", + ZB.MOM.WW.ScadaBridge.Commons.Types.Enums.AlarmState.Active, 100, baseTime)); + sub.OnEvent(new AttributeValueChanged(InstanceName, "IO", "B", 2, "Good", baseTime)); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + baseTime); + ctx.BridgeActor.Tell(snapshot); + + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 4; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.IsType(ctx.ReceivedEvents[0]); + Assert.Equal("A", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); + Assert.Equal("AlarmX", Assert.IsType(ctx.ReceivedEvents[2]).AlarmName); + Assert.Equal("B", Assert.IsType(ctx.ReceivedEvents[3]).AttributeName); + } + } + + [Fact] + public void PassThrough_After_Flush_Delivers_Subsequent_Events_Immediately() + { + // M2.18: after the snapshot+flush the actor switches to pass-through — later + // events go straight to _onEvent (no buffering, no dup). + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + ctx.BridgeActor.Tell(snapshot); + + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + + // Post-snapshot event — must be delivered immediately, exactly once. + var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 42.5, "Good", + DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(postEvent); + + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.IsType(ctx.ReceivedEvents[1]); + } + } + + [Fact] + public void InstanceNotFound_After_StreamFirst_Tears_Down_Stream_And_Does_Not_PassThrough() + { + // M2.18 + M2.11: stream-first means the gRPC subscription is already open + // when an InstanceNotFound snapshot arrives. The bridge must tear that stream + // down (Unsubscribe the just-opened correlation), deliver the not-found + // snapshot, NOT enter pass-through, and stop cleanly. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + + // Stream opened up-front (stream-first). + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var notFoundSnapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow, + InstanceNotFound: true); + + Watch(ctx.BridgeActor); + ctx.BridgeActor.Tell(notFoundSnapshot); + + // Not-found snapshot delivered. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.True(Assert.IsType(ctx.ReceivedEvents[0]).InstanceNotFound); + } + + // The just-opened stream must be torn down. + AwaitCondition(() => ctx.MockGrpcClient.UnsubscribedCorrelationIds.Contains("corr-1"), + TimeSpan.FromSeconds(3)); + + // Stops cleanly. + ExpectTerminated(ctx.BridgeActor, TimeSpan.FromSeconds(3)); + + // No pass-through: an event arriving after the stop is not delivered. + var late = new AttributeValueChanged(InstanceName, "IO", "Temp", 1, "Good", DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[0].OnEvent(late); + Thread.Sleep(200); + lock (ctx.ReceivedEvents) { Assert.Single(ctx.ReceivedEvents); } + } + + [Fact] + public void Reconnect_During_Buffering_Phase_Keeps_Buffering_Until_Snapshot() + { + // M2.18: a gRPC error/reconnect BEFORE the snapshot arrives must remain in the + // buffering phase — events on the new stream are still buffered, then flushed + // when the snapshot finally arrives. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + // Error before snapshot → reconnect (still buffering). + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("pre-snapshot blip")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + + // Event on the reconnected stream — still buffered (snapshot not yet delivered). + var gapEvent = new AttributeValueChanged(InstanceName, "IO", "Late", 7, "Good", + DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(gapEvent); + lock (ctx.ReceivedEvents) { Assert.Empty(ctx.ReceivedEvents); } + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + ctx.BridgeActor.Tell(snapshot); + + // snapshot + the event buffered across the reconnect. + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.IsType(ctx.ReceivedEvents[0]); + Assert.Equal("Late", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); + } + } + + [Fact] + public void Reconnect_After_Snapshot_Resumes_PassThrough_Not_Buffering() + { + // M2.18: a mid-session reconnect (after the snapshot was already delivered) + // must resume pass-through — the snapshot is a one-time thing and events on + // the reconnected stream are delivered immediately, not re-buffered. + var ctx = CreateBridgeActor(); + ctx.CommProbe.ExpectMsg(); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 1, TimeSpan.FromSeconds(3)); + + var snapshot = new DebugViewSnapshot( + InstanceName, + new List(), + new List(), + DateTimeOffset.UtcNow); + ctx.BridgeActor.Tell(snapshot); + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 1; } }, + TimeSpan.FromSeconds(3)); + + // Mid-session reconnect. + ctx.MockGrpcClient.SubscribeCalls[0].OnError(new Exception("mid-session blip")); + AwaitCondition(() => ctx.MockGrpcClient.SubscribeCalls.Count == 2, TimeSpan.FromSeconds(5)); + + // Event on the reconnected stream — delivered immediately (pass-through). + var postEvent = new AttributeValueChanged(InstanceName, "IO", "Temp", 9, "Good", + DateTimeOffset.UtcNow); + ctx.MockGrpcClient.SubscribeCalls[1].OnEvent(postEvent); + + AwaitCondition(() => { lock (ctx.ReceivedEvents) { return ctx.ReceivedEvents.Count == 2; } }, + TimeSpan.FromSeconds(3)); + lock (ctx.ReceivedEvents) + { + Assert.Equal("Temp", Assert.IsType(ctx.ReceivedEvents[1]).AttributeName); + } + } + [Fact] public void RetryCount_RecoveredOnlyAfterStreamStaysStableForStabilityWindow() {