using Akka.Actor; using Akka.Event; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Streaming; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Actors; /// /// Persistent actor (one per active debug session) on the central side. /// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor /// as the Sender) to get the initial snapshot. After receiving the snapshot, opens a gRPC /// server-streaming subscription via SiteStreamGrpcClient for ongoing events. /// Stream events are marshalled back to the actor via Self.Tell for thread safety. /// public class DebugStreamBridgeActor : ReceiveActor, IWithTimers { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly string _siteIdentifier; private readonly string _instanceUniqueName; private readonly string _correlationId; private readonly IActorRef _centralCommunicationActor; private readonly Action _onEvent; private readonly Action _onTerminated; private readonly SiteStreamGrpcClientFactory _grpcFactory; private readonly string _grpcNodeAAddress; private readonly string _grpcNodeBAddress; private const int MaxRetries = 3; private const string ReconnectTimerKey = "grpc-reconnect"; internal static TimeSpan ReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); private int _retryCount; private bool _useNodeA = true; private bool _stopped; private CancellationTokenSource? _grpcCts; public ITimerScheduler Timers { get; set; } = null!; public DebugStreamBridgeActor( string siteIdentifier, string instanceUniqueName, string correlationId, IActorRef centralCommunicationActor, Action onEvent, Action onTerminated, SiteStreamGrpcClientFactory grpcFactory, string grpcNodeAAddress, string grpcNodeBAddress) { _siteIdentifier = siteIdentifier; _instanceUniqueName = instanceUniqueName; _correlationId = correlationId; _centralCommunicationActor = centralCommunicationActor; _onEvent = onEvent; _onTerminated = onTerminated; _grpcFactory = grpcFactory; _grpcNodeAAddress = grpcNodeAAddress; _grpcNodeBAddress = grpcNodeBAddress; // Initial snapshot response from the site (via ClusterClient) Receive(snapshot => { _log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)", _instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count); _onEvent(snapshot); OpenGrpcStream(); }); // Domain events arriving via Self.Tell from gRPC callback Receive(changed => { _retryCount = 0; // Successful event resets retry count _onEvent(changed); }); Receive(changed => { _retryCount = 0; _onEvent(changed); }); // gRPC stream error — attempt reconnection Receive(msg => { _log.Warning("gRPC stream error for {0}: {1}", _instanceUniqueName, msg.Exception.Message); HandleGrpcError(); }); // Scheduled reconnection Receive(_ => OpenGrpcStream()); // Consumer requests stop Receive(_ => { _log.Info("Stopping debug stream for {0}", _instanceUniqueName); CleanupGrpc(); SendUnsubscribe(); _stopped = true; Context.Stop(Self); }); // Site disconnected — CentralCommunicationActor notifies us Receive(msg => { if (_stopped) return; // Idempotent — gRPC error may arrive simultaneously _log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId); CleanupGrpc(); _stopped = true; _onTerminated(); Context.Stop(Self); }); // Orphan safety net — if nobody stops us within 5 minutes, self-terminate Context.SetReceiveTimeout(TimeSpan.FromMinutes(5)); Receive(_ => { _log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName); CleanupGrpc(); SendUnsubscribe(); _stopped = true; _onTerminated(); Context.Stop(Self); }); } protected override void PreStart() { _log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier); // Send subscribe request via CentralCommunicationActor for the initial snapshot. var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId); var envelope = new SiteEnvelope(_siteIdentifier, request); _centralCommunicationActor.Tell(envelope, Self); } protected override void PostStop() { _grpcCts?.Cancel(); _grpcCts?.Dispose(); _grpcCts = null; base.PostStop(); } private void OpenGrpcStream() { if (_stopped) return; var endpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress; _log.Info("Opening gRPC stream for {0} to {1}", _instanceUniqueName, endpoint); _grpcCts?.Cancel(); _grpcCts?.Dispose(); _grpcCts = new CancellationTokenSource(); var client = _grpcFactory.GetOrCreate(_siteIdentifier, endpoint); var self = Self; var ct = _grpcCts.Token; // Launch as background task — onEvent and onError marshal back to actor via Tell Task.Run(async () => { await client.SubscribeAsync( _correlationId, _instanceUniqueName, evt => self.Tell(evt), ex => self.Tell(new GrpcStreamError(ex)), ct); }, ct); } private void HandleGrpcError() { if (_stopped) return; _retryCount++; if (_retryCount > MaxRetries) { _log.Error("gRPC stream for {0} exceeded max retries ({1}), terminating", _instanceUniqueName, MaxRetries); CleanupGrpc(); _stopped = true; _onTerminated(); Context.Stop(Self); return; } // Unsubscribe the failed stream on the *previous* endpoint before reconnecting. // This cancels the local subscription CTS and -- where the channel is still // alive -- propagates gRPC cancellation to the site so its SiteStreamGrpcServer // stops the StreamRelayActor for this correlation ID, rather than leaving a // zombie relay actor until TCP RST / keepalive eventually detects the loss. var previousEndpoint = _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress; var previousClient = _grpcFactory.GetOrCreate(_siteIdentifier, previousEndpoint); previousClient.Unsubscribe(_correlationId); // Flip to the other node _useNodeA = !_useNodeA; // First retry is immediate, subsequent retries use a short backoff if (_retryCount == 1) { Self.Tell(new ReconnectGrpcStream()); } else { Timers.StartSingleTimer(ReconnectTimerKey, new ReconnectGrpcStream(), ReconnectDelay); } } private void CleanupGrpc() { _grpcCts?.Cancel(); _grpcCts?.Dispose(); _grpcCts = null; var client = _grpcFactory.GetOrCreate(_siteIdentifier, _useNodeA ? _grpcNodeAAddress : _grpcNodeBAddress); client.Unsubscribe(_correlationId); } private void SendUnsubscribe() { var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId); var envelope = new SiteEnvelope(_siteIdentifier, request); _centralCommunicationActor.Tell(envelope, Self); } } /// /// Message sent to a DebugStreamBridgeActor to stop the debug stream session. /// public record StopDebugStream; /// /// Internal message indicating a gRPC stream error occurred. /// internal record GrpcStreamError(Exception Exception); /// /// Internal message to trigger gRPC stream reconnection. /// internal record ReconnectGrpcStream;