using System.Collections.Concurrent; using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Communication.Actors; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication; /// /// Manages debug stream sessions by creating DebugStreamBridgeActors that persist /// as subscribers on the site side. Both the Blazor debug view and the SignalR hub /// use this service to start/stop streams. /// public class DebugStreamService { private readonly CommunicationService _communicationService; private readonly IServiceProvider _serviceProvider; private readonly SiteStreamGrpcClientFactory _grpcClientFactory; private readonly ILogger _logger; private readonly ConcurrentDictionary _sessions = new(); private ActorSystem? _actorSystem; public DebugStreamService( CommunicationService communicationService, IServiceProvider serviceProvider, SiteStreamGrpcClientFactory grpcClientFactory, ILogger logger) { _communicationService = communicationService; _serviceProvider = serviceProvider; _grpcClientFactory = grpcClientFactory; _logger = logger; } /// /// Sets the ActorSystem reference. Called during actor system startup (from AkkaHostedService). /// public void SetActorSystem(ActorSystem actorSystem) { _actorSystem = actorSystem; } /// /// Starts a debug stream session. Returns the initial snapshot. /// Ongoing events are delivered via the onEvent callback. /// The onTerminated callback fires if the stream is killed (site disconnect, timeout). /// public async Task StartStreamAsync( int instanceId, Action onEvent, Action onTerminated, CancellationToken ct = default) { var system = _actorSystem ?? throw new InvalidOperationException("DebugStreamService not initialized. ActorSystem not set."); // Resolve instance → unique name + site string instanceUniqueName; string siteIdentifier; string grpcNodeAAddress; string grpcNodeBAddress; using (var scope = _serviceProvider.CreateScope()) { var instanceRepo = scope.ServiceProvider.GetRequiredService(); var instance = await instanceRepo.GetInstanceByIdAsync(instanceId) ?? throw new InvalidOperationException($"Instance {instanceId} not found."); var siteRepo = scope.ServiceProvider.GetRequiredService(); var site = await siteRepo.GetSiteByIdAsync(instance.SiteId) ?? throw new InvalidOperationException($"Site {instance.SiteId} not found."); instanceUniqueName = instance.UniqueName; siteIdentifier = site.SiteIdentifier; grpcNodeAAddress = site.GrpcNodeAAddress ?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured."); grpcNodeBAddress = site.GrpcNodeBAddress ?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured."); } var sessionId = Guid.NewGuid().ToString("N"); // Capture the initial snapshot via a TaskCompletionSource var snapshotTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Action onEventWrapper = evt => { if (evt is DebugViewSnapshot snapshot && !snapshotTcs.Task.IsCompleted) { snapshotTcs.TrySetResult(snapshot); } else { onEvent(evt); } }; Action onTerminatedWrapper = () => { _sessions.TryRemove(sessionId, out _); snapshotTcs.TrySetException(new InvalidOperationException("Debug stream terminated before snapshot received.")); onTerminated(); }; // Create the bridge actor — use type-based Props to avoid expression tree limitations with closures var commActor = _communicationService.GetCommunicationActor(); var props = Props.Create(typeof(DebugStreamBridgeActor), siteIdentifier, instanceUniqueName, sessionId, commActor, onEventWrapper, onTerminatedWrapper, _grpcClientFactory, grpcNodeAAddress, grpcNodeBAddress); var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}"); _sessions[sessionId] = bridgeActor; // Wait for the initial snapshot (with timeout) using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); DebugViewSnapshot snapshot; try { snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token); } catch (Exception ex) { // Any failure before the snapshot arrives — the 30s timeout, or the stream // terminating early (site disconnect / gRPC failure, surfaced by // onTerminatedWrapper as an InvalidOperationException) — must deterministically // tear down the bridge actor and its site-side subscription. Use the local // actor reference: a racing onTerminatedWrapper may already have removed the // session, which would make StopStream a no-op. StopDebugStream is idempotent // (the actor may already be stopping itself). _sessions.TryRemove(sessionId, out _); bridgeActor.Tell(new StopDebugStream()); if (ex is OperationCanceledException) throw new TimeoutException( $"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}."); throw new InvalidOperationException( $"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.", ex); } _logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}", sessionId, instanceUniqueName, siteIdentifier); return new DebugStreamSession(sessionId, snapshot); } /// /// Stops an active debug stream session. /// public void StopStream(string sessionId) { if (_sessions.TryRemove(sessionId, out var bridgeActor)) { bridgeActor.Tell(new StopDebugStream()); _logger.LogInformation("Debug stream {SessionId} stopped", sessionId); } } } public record DebugStreamSession(string SessionId, DebugViewSnapshot InitialSnapshot);