diff --git a/docs/requirements/Component-Communication.md b/docs/requirements/Component-Communication.md index 87c2d94..102fc80 100644 --- a/docs/requirements/Component-Communication.md +++ b/docs/requirements/Component-Communication.md @@ -51,9 +51,9 @@ Both central and site clusters. Each side has communication actors that handle m ### 6. Debug Streaming (Site → Central) - **Pattern**: Subscribe/push with initial snapshot (no polling). -- A **DebugStreamBridgeActor** (one per active debug session) is created on the central cluster by the **DebugStreamService**. The bridge actor sends a `SubscribeDebugViewRequest` to the site via `CentralCommunicationActor`, with itself as the `Sender`. The site's `InstanceActor` registers the bridge actor as the debug subscriber. -- Site requests a **snapshot** of all current attribute values and alarm states from the Instance Actor and sends it to the bridge actor. -- Site then subscribes to the **site-wide Akka stream** filtered by the instance's unique name and forwards `AttributeValueChanged` and `AlarmStateChanged` events to the bridge actor in real time via Akka remoting. +- A **DebugStreamBridgeActor** (one per active debug session) is created on the central cluster by the **DebugStreamService**. The bridge actor sends a `SubscribeDebugViewRequest` to the site via `CentralCommunicationActor`. The site's `InstanceActor` stores the subscription's correlation ID and replies with an initial snapshot via the ClusterClient reply path. +- Site requests a **snapshot** of all current attribute values and alarm states from the Instance Actor and sends it back to the bridge actor (via the ClusterClient reply path, which works for immediate responses). +- For ongoing events, the InstanceActor wraps `AttributeValueChanged` and `AlarmStateChanged` in a `DebugStreamEvent(correlationId, event)` message and sends it to the local `SiteCommunicationActor`. The SiteCommunicationActor forwards it to central via its own ClusterClient (`ClusterClient.Send("/user/central-communication", event)`). The `CentralCommunicationActor` looks up the bridge actor by correlation ID and delivers the event. This follows the same site→central pattern as health reports. - The bridge actor forwards received events to the consumer via callbacks (Blazor component or SignalR hub). - Attribute value stream messages: `[InstanceUniqueName].[AttributePath].[AttributeName]`, value, quality, timestamp. - Alarm state stream messages: `[InstanceUniqueName].[AlarmName]`, state (active/normal), priority, timestamp. diff --git a/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs b/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs new file mode 100644 index 0000000..d9afa01 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/DebugView/DebugStreamEvent.cs @@ -0,0 +1,8 @@ +namespace ScadaLink.Commons.Messages.DebugView; + +/// +/// Wraps a debug stream event (AttributeValueChanged or AlarmStateChanged) with +/// the correlationId for routing back to the correct DebugStreamBridgeActor on central. +/// Sent from InstanceActor → SiteCommunicationActor → ClusterClient → CentralCommunicationActor. +/// +public record DebugStreamEvent(string CorrelationId, object Event); diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index f31bde5..03b971c 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -85,6 +85,9 @@ public class CentralCommunicationActor : ReceiveActor // Route enveloped messages to sites Receive(HandleSiteEnvelope); + + // Route debug stream events from sites to the correct bridge actor + Receive(HandleDebugStreamEvent); } private void HandleHeartbeat(HeartbeatMessage heartbeat) @@ -93,6 +96,18 @@ public class CentralCommunicationActor : ReceiveActor Context.Parent.Tell(heartbeat); } + private void HandleDebugStreamEvent(Commons.Messages.DebugView.DebugStreamEvent msg) + { + if (_debugSubscriptions.TryGetValue(msg.CorrelationId, out var entry)) + { + entry.Subscriber.Tell(msg.Event); + } + else + { + _log.Debug("No debug subscription found for correlationId {0}, dropping event", msg.CorrelationId); + } + } + private void HandleSiteHealthReport(SiteHealthReport report) { var aggregator = _serviceProvider.GetService(); diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 1bcfd2d..faec802 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -145,6 +145,13 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers _centralClient?.Tell( new ClusterClient.Send("/user/central-communication", msg), Self); }); + + // Internal: forward debug stream events to central (site→central streaming) + Receive(msg => + { + _centralClient?.Tell( + new ClusterClient.Send("/user/central-communication", msg), Self); + }); } protected override void PreStart() diff --git a/src/ScadaLink.ManagementService/DebugStreamHub.cs b/src/ScadaLink.ManagementService/DebugStreamHub.cs index 6c3c0d3..a84e802 100644 --- a/src/ScadaLink.ManagementService/DebugStreamHub.cs +++ b/src/ScadaLink.ManagementService/DebugStreamHub.cs @@ -19,11 +19,16 @@ public class DebugStreamHub : Hub private const string SessionIdKey = "DebugStreamSessionId"; private readonly DebugStreamService _debugStreamService; + private readonly IHubContext _hubContext; private readonly ILogger _logger; - public DebugStreamHub(DebugStreamService debugStreamService, ILogger logger) + public DebugStreamHub( + DebugStreamService debugStreamService, + IHubContext hubContext, + ILogger logger) { _debugStreamService = debugStreamService; + _hubContext = hubContext; _logger = logger; } @@ -105,6 +110,10 @@ public class DebugStreamHub : Hub try { + // Use IHubContext for callbacks — the hub instance is transient (disposed after method returns), + // but IHubContext is a singleton that remains valid for the lifetime of the connection. + var hubClients = _hubContext.Clients; + var session = await _debugStreamService.StartStreamAsync( instanceId, onEvent: evt => @@ -113,17 +122,17 @@ public class DebugStreamHub : Hub _ = evt switch { AttributeValueChanged changed => - Clients.Client(connectionId).SendAsync("OnAttributeChanged", changed), + hubClients.Client(connectionId).SendAsync("OnAttributeChanged", changed), AlarmStateChanged changed => - Clients.Client(connectionId).SendAsync("OnAlarmChanged", changed), + hubClients.Client(connectionId).SendAsync("OnAlarmChanged", changed), DebugViewSnapshot snapshot => - Clients.Client(connectionId).SendAsync("OnSnapshot", snapshot), + hubClients.Client(connectionId).SendAsync("OnSnapshot", snapshot), _ => Task.CompletedTask }; }, onTerminated: () => { - _ = Clients.Client(connectionId).SendAsync("OnStreamTerminated", "Site disconnected"); + _ = hubClients.Client(connectionId).SendAsync("OnStreamTerminated", "Site disconnected"); }); Context.Items[SessionIdKey] = session.SessionId; diff --git a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs index 49ed206..ec850ee 100644 --- a/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/InstanceActor.cs @@ -51,7 +51,8 @@ public class InstanceActor : ReceiveActor private FlattenedConfiguration? _configuration; // WP-25: Debug view subscribers - private readonly Dictionary _debugSubscribers = new(); + private readonly HashSet _debugSubscriberCorrelationIds = new(); + private ActorSelection? _siteCommActor; // DCL manager actor reference for subscribing to tag values private readonly IActorRef? _dclManager; @@ -149,6 +150,9 @@ public class InstanceActor : ReceiveActor base.PreStart(); _logger.LogInformation("InstanceActor started for {Instance}", _instanceUniqueName); + // Resolve SiteCommunicationActor for routing debug stream events back to central + _siteCommActor = Context.ActorSelection("/user/site-communication"); + // Asynchronously load static overrides from SQLite and pipe to self var self = Self; _storage.GetStaticOverridesAsync(_instanceUniqueName).ContinueWith(t => @@ -367,10 +371,10 @@ public class InstanceActor : ReceiveActor // WP-23: Publish to site-wide stream _streamManager?.PublishAlarmStateChanged(changed); - // Forward to debug subscribers - foreach (var sub in _debugSubscribers.Values) + // Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central + foreach (var correlationId in _debugSubscriberCorrelationIds) { - sub.Tell(changed); + _siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed)); } } @@ -380,7 +384,7 @@ public class InstanceActor : ReceiveActor private void HandleSubscribeDebugView(SubscribeDebugViewRequest request) { var subscriptionId = request.CorrelationId; - _debugSubscribers[subscriptionId] = Sender; + _debugSubscriberCorrelationIds.Add(subscriptionId); // Build snapshot from current state var now = DateTimeOffset.UtcNow; @@ -407,9 +411,6 @@ public class InstanceActor : ReceiveActor Sender.Tell(snapshot); - // Also register with stream manager for filtered events - _streamManager?.Subscribe(_instanceUniqueName, Sender); - _logger.LogDebug( "Debug view subscriber added for {Instance}, subscriptionId={Id}", _instanceUniqueName, subscriptionId); @@ -420,8 +421,7 @@ public class InstanceActor : ReceiveActor /// private void HandleUnsubscribeDebugView(UnsubscribeDebugViewRequest request) { - _debugSubscribers.Remove(request.CorrelationId); - _streamManager?.RemoveSubscriber(Sender); + _debugSubscriberCorrelationIds.Remove(request.CorrelationId); _logger.LogDebug( "Debug view subscriber removed for {Instance}, correlationId={Id}", @@ -479,10 +479,10 @@ public class InstanceActor : ReceiveActor alarmActor.Tell(changed); } - // Forward to debug subscribers - foreach (var sub in _debugSubscribers.Values) + // Forward to debug subscribers via SiteCommunicationActor → ClusterClient → central + foreach (var correlationId in _debugSubscriberCorrelationIds) { - sub.Tell(changed); + _siteCommActor?.Tell(new DebugStreamEvent(correlationId, changed)); } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs index 10899fb..c9a96b1 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Actors/InstanceActorIntegrationTests.cs @@ -40,6 +40,22 @@ public class InstanceActorIntegrationTests : TestKit, IDisposable MaxScriptCallDepth = 10, ScriptExecutionTimeoutSeconds = 30 }; + + // Create a fake site-communication actor that unwraps DebugStreamEvent + // and forwards the inner event to TestActor (simulating the ClusterClient relay) + Sys.ActorOf(Props.Create(() => new DebugStreamEventForwarder(TestActor)), "site-communication"); + } + + /// + /// Test helper: stands in for SiteCommunicationActor, unwraps DebugStreamEvent + /// and forwards the inner event to a target actor. + /// + private class DebugStreamEventForwarder : ReceiveActor + { + public DebugStreamEventForwarder(IActorRef target) + { + Receive(msg => target.Tell(msg.Event)); + } } void IDisposable.Dispose()