From 49f042a93783ec58c57ef03021f3f74f2d5716a8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 21 Mar 2026 12:18:52 -0400 Subject: [PATCH] refactor: remove ClusterClient streaming path (DebugStreamEvent), events flow via gRPC --- .../Messages/DebugView/DebugStreamEvent.cs | 8 --- .../Actors/CentralCommunicationActor.cs | 14 ------ .../Actors/SiteCommunicationActor.cs | 6 --- .../Actors/InstanceActor.cs | 30 ++--------- .../ArchitecturalConstraintTests.cs | 9 ++++ .../Actors/InstanceActorIntegrationTests.cs | 50 ++++++------------- 6 files changed, 27 insertions(+), 90 deletions(-) delete mode 100644 src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs diff --git a/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs b/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs deleted file mode 100644 index d9afa01..0000000 --- a/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace ScadaLink.Commons.Messages.DebugView; - -/// -/// Wraps a debug stream event (AttributeValueChanged or AlarmStateChanged) with -/// the correlationId for routing back to the correct DebugStreamBridgeActor on central. -/// Sent from InstanceActor → SiteCommunicationActor → ClusterClient → CentralCommunicationActor. -/// -public record DebugStreamEvent(string CorrelationId, object Event); diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 03b971c..080661f 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -86,8 +86,6 @@ public class CentralCommunicationActor : ReceiveActor // Route enveloped messages to sites Receive(HandleSiteEnvelope); - // Route debug stream events from sites to the correct bridge actor - Receive(HandleDebugStreamEvent); } private void HandleHeartbeat(HeartbeatMessage heartbeat) @@ -96,18 +94,6 @@ public class CentralCommunicationActor : ReceiveActor Context.Parent.Tell(heartbeat); } - private void HandleDebugStreamEvent(Commons.Messages.DebugView.DebugStreamEvent msg) - { - if (_debugSubscriptions.TryGetValue(msg.CorrelationId, out var entry)) - { - entry.Subscriber.Tell(msg.Event); - } - else - { - _log.Debug("No debug subscription found for correlationId {0}, dropping event", msg.CorrelationId); - } - } - private void HandleSiteHealthReport(SiteHealthReport report) { var aggregator = _serviceProvider.GetService(); diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index faec802..35f300c 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -146,12 +146,6 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers new ClusterClient.Send("/user/central-communication", msg), Self); }); - // Internal: forward debug stream events to central (site→central streaming) - Receive(msg => - { - _centralClient?.Tell( - new ClusterClient.Send("/user/central-communication", msg), Self); - }); } protected override void PreStart() diff --git a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs index ec850ee..5840859 100644 --- a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs @@ -50,10 +50,6 @@ public class InstanceActor : ReceiveActor private readonly Dictionary _alarmActors = new(); private FlattenedConfiguration? _configuration; - // WP-25: Debug view subscribers - private readonly HashSet _debugSubscriberCorrelationIds = new(); - private ActorSelection? _siteCommActor; - // DCL manager actor reference for subscribing to tag values private readonly IActorRef? _dclManager; // Maps tag paths back to attribute canonical names for DCL updates @@ -150,9 +146,6 @@ public class InstanceActor : ReceiveActor base.PreStart(); _logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName); - // Resolve SiteCommunicationActor for routing debug stream events back to central - _siteCommActor = Context.ActorSelection("/user/site-communication"); - // Asynchronously load static overrides from SQLite and pipe to self var self = Self; _storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t => @@ -370,12 +363,6 @@ public class InstanceActor : ReceiveActor // WP-23: Publish to site-wide stream _streamManager?.PublishAlarmStateChanged(changed); - - // Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central - foreach (var correlationId in _debugSubscriberCorrelationIds) - { - _siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed)); - } } /// @@ -383,9 +370,6 @@ public class InstanceActor : ReceiveActor /// private void HandleSubscribeDebugView(SubscribeDebugViewRequest request) { - var subscriptionId = request.CorrelationId; - _debugSubscriberCorrelationIds.Add(subscriptionId); - // Build snapshot from current state var now = DateTimeOffset.UtcNow; var attributeValues = _attributes.Select(kvp => new AttributeValueChanged( @@ -412,8 +396,8 @@ public class InstanceActor : ReceiveActor Sender.Tell(snapshot); _logger.LogDebug( - "Debug view subscriber added for {Instance}, subscriptionId={Id}", - _instanceUniqueName, subscriptionId); + "Debug view snapshot sent for {Instance}, correlationId={Id}", + _instanceUniqueName, request.CorrelationId); } /// @@ -421,10 +405,8 @@ public class InstanceActor : ReceiveActor /// private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request) { - _debugSubscriberCorrelationIds.Remove(request.CorrelationId); - _logger.LogDebug( - "Debug view subscriber removed for {Instance}, correlationId={Id}", + "Debug view unsubscribe for {Instance}, correlationId={Id}", _instanceUniqueName, request.CorrelationId); } @@ -478,12 +460,6 @@ public class InstanceActor : ReceiveActor { alarmActor.Tell(changed); } - - // Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central - foreach (var correlationId in _debugSubscriberCorrelationIds) - { - _siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed)); - } } /// diff --git a/tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs b/tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs index e3346d1..cb71799 100644 --- a/tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs +++ b/tests/ScadaLink.Commons.Tests/ArchitecturalConstraintTests.cs @@ -136,6 +136,15 @@ public class ArchitecturalConstraintTests } } + [Fact] + public void DebugStreamEvent_ShouldNotExist() + { + // DebugStreamEvent was removed when debug streaming moved from ClusterClient to gRPC. + // Events now flow via SiteStreamManager → StreamRelayActor → gRPC channel. + var type = CommonsAssembly.GetTypes().FirstOrDefault(t => t.Name == "DebugStreamEvent"); + Assert.Null(type); + } + [Fact] public void AllEnums_ShouldBeSingularNamed() { diff --git a/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs index c9a96b1..0bc84b1 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs @@ -41,21 +41,6 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable ScriptExecutionTimeoutSeconds = 30 }; - // Create a fake site-communication actor that unwraps DebugStreamEvent - // and forwards the inner event to TestActor (simulating the ClusterClient relay) - Sys.ActorOf(Props.Create(() => new DebugStreamEventForwarder(TestActor)), "site-communication"); - } - - /// - /// Test helper: stands in for SiteCommunicationActor, unwraps DebugStreamEvent - /// and forwards the inner event to a target actor. - /// - private class DebugStreamEventForwarder : ReceiveActor - { - public DebugStreamEventForwarder(IActorRef target) - { - Receive(msg => target.Tell(msg.Event)); - } } void IDisposable.Dispose() @@ -169,22 +154,23 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable } [Fact] - public void InstanceActor_WP25_DebugViewSubscriber_ReceivesChanges() + public void InstanceActor_WP25_DebugViewSubscribe_NoLongerForwardsEventsViaClusterClient() { + // Events now flow via gRPC (SiteStreamManager → StreamRelayActor → gRPC), + // not via ClusterClient. Subscribing returns a snapshot but ongoing events + // are NOT forwarded to the subscriber actor. var actor = CreateInstanceWithScripts("Pump1"); - // Subscribe to debug view + // Subscribe to debug view — should still get snapshot actor.Tell(new SubscribeDebugViewRequest("Pump1", "debug-2")); ExpectMsg(TimeSpan.FromSeconds(5)); - // Now change an attribute + // Change an attribute actor.Tell(new AttributeValueChanged( "Pump1", "Temperature", "Temperature", "200", "Good", DateTimeOffset.UtcNow)); - // The subscriber should receive the change notification - var changed = ExpectMsg(TimeSpan.FromSeconds(5)); - Assert.Equal("Temperature", changed.AttributeName); - Assert.Equal("200", changed.Value?.ToString()); + // Should NOT receive change notification (old ClusterClient path removed) + ExpectNoMsg(TimeSpan.FromSeconds(1)); } [Fact] @@ -223,20 +209,14 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable var actor = CreateInstanceWithScripts("Pump1", alarms: alarms); - // Subscribe to debug view to observe alarm state changes - actor.Tell(new SubscribeDebugViewRequest("Pump1", "debug-alarm")); - ExpectMsg(TimeSpan.FromSeconds(5)); + // Wait for initialization + Thread.Sleep(500); - // Send value outside range to trigger alarm - actor.Tell(new AttributeValueChanged( - "Pump1", "Temperature", "Temperature", "150", "Good", DateTimeOffset.UtcNow)); + // Verify alarm actor was created by checking the debug snapshot includes the alarm + actor.Tell(new DebugSnapshotRequest("Pump1", "snap-alarm")); + var snapshot = ExpectMsg(TimeSpan.FromSeconds(5)); - // Should receive the attribute change first (from debug subscription) - ExpectMsg(TimeSpan.FromSeconds(5)); - - // Then the alarm state change (forwarded by Instance Actor) - var alarmMsg = ExpectMsg(TimeSpan.FromSeconds(5)); - Assert.Equal("HighTemp", alarmMsg.AlarmName); - Assert.Equal(Commons.Types.Enums.AlarmState.Active, alarmMsg.State); + Assert.Single(snapshot.AlarmStates); + Assert.Equal("HighTemp", snapshot.AlarmStates[0].AlarmName); } }