Files
ScadaBridge/src/ScadaLink.Communication/DebugStreamService.cs
T
Joseph Doherty 2cd43b6992 feat: update DebugStreamBridgeActor to use gRPC for streaming events
After receiving the initial snapshot via ClusterClient, the bridge actor
now opens a gRPC server-streaming subscription via SiteStreamGrpcClient
for ongoing AttributeValueChanged/AlarmStateChanged events. Adds NodeA/
NodeB failover with max 3 retries, retry count reset on successful event,
and IWithTimers-based reconnect scheduling.

- DebugStreamBridgeActor: gRPC stream after snapshot, reconnect state machine
- DebugStreamService: inject SiteStreamGrpcClientFactory, resolve gRPC addresses
- ServiceCollectionExtensions: register SiteStreamGrpcClientFactory singleton
- SiteStreamGrpcClient: make SubscribeAsync/Unsubscribe virtual for testability
- SiteStreamGrpcClientFactory: make GetOrCreate virtual for testability
- New test suite: DebugStreamBridgeActorTests (8 tests)
2026-03-21 12:14:24 -04:00

160 lines
6.0 KiB
C#

using System.Collections.Concurrent;
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication;
/// <summary>
/// Manages debug stream sessions by creating DebugStreamBridgeActors that persist
/// as subscribers on the site side. Both the Blazor debug view and the SignalR hub
/// use this service to start/stop streams.
/// </summary>
public class DebugStreamService
{
private readonly CommunicationService _communicationService;
private readonly IServiceProvider _serviceProvider;
private readonly SiteStreamGrpcClientFactory _grpcClientFactory;
private readonly ILogger<DebugStreamService> _logger;
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
private ActorSystem? _actorSystem;
public DebugStreamService(
CommunicationService communicationService,
IServiceProvider serviceProvider,
SiteStreamGrpcClientFactory grpcClientFactory,
ILogger<DebugStreamService> logger)
{
_communicationService = communicationService;
_serviceProvider = serviceProvider;
_grpcClientFactory = grpcClientFactory;
_logger = logger;
}
/// <summary>
/// Sets the ActorSystem reference. Called during actor system startup (from AkkaHostedService).
/// </summary>
public void SetActorSystem(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
}
/// <summary>
/// Starts a debug stream session. Returns the initial snapshot.
/// Ongoing events are delivered via the onEvent callback.
/// The onTerminated callback fires if the stream is killed (site disconnect, timeout).
/// </summary>
public async Task<DebugStreamSession> StartStreamAsync(
int instanceId,
Action<object> onEvent,
Action onTerminated,
CancellationToken ct = default)
{
var system = _actorSystem
?? throw new InvalidOperationException("DebugStreamService not initialized. ActorSystem not set.");
// Resolve instance → unique name + site
string instanceUniqueName;
string siteIdentifier;
string grpcNodeAAddress;
string grpcNodeBAddress;
using (var scope = _serviceProvider.CreateScope())
{
var instanceRepo = scope.ServiceProvider.GetRequiredService<ITemplateEngineRepository>();
var instance = await instanceRepo.GetInstanceByIdAsync(instanceId)
?? throw new InvalidOperationException($"Instance {instanceId} not found.");
var siteRepo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var site = await siteRepo.GetSiteByIdAsync(instance.SiteId)
?? throw new InvalidOperationException($"Site {instance.SiteId} not found.");
instanceUniqueName = instance.UniqueName;
siteIdentifier = site.SiteIdentifier;
grpcNodeAAddress = site.GrpcNodeAAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeAAddress configured.");
grpcNodeBAddress = site.GrpcNodeBAddress
?? throw new InvalidOperationException($"Site {siteIdentifier} has no GrpcNodeBAddress configured.");
}
var sessionId = Guid.NewGuid().ToString("N");
// Capture the initial snapshot via a TaskCompletionSource
var snapshotTcs = new TaskCompletionSource<DebugViewSnapshot>(TaskCreationOptions.RunContinuationsAsynchronously);
Action<object> onEventWrapper = evt =>
{
if (evt is DebugViewSnapshot snapshot && !snapshotTcs.Task.IsCompleted)
{
snapshotTcs.TrySetResult(snapshot);
}
else
{
onEvent(evt);
}
};
Action onTerminatedWrapper = () =>
{
_sessions.TryRemove(sessionId, out _);
snapshotTcs.TrySetException(new InvalidOperationException("Debug stream terminated before snapshot received."));
onTerminated();
};
// Create the bridge actor — use type-based Props to avoid expression tree limitations with closures
var commActor = _communicationService.GetCommunicationActor();
var props = Props.Create(typeof(DebugStreamBridgeActor),
siteIdentifier,
instanceUniqueName,
sessionId,
commActor,
onEventWrapper,
onTerminatedWrapper,
_grpcClientFactory,
grpcNodeAAddress,
grpcNodeBAddress);
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");
_sessions[sessionId] = bridgeActor;
// Wait for the initial snapshot (with timeout)
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
sessionId, instanceUniqueName, siteIdentifier);
return new DebugStreamSession(sessionId, snapshot);
}
catch (OperationCanceledException)
{
StopStream(sessionId);
throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
}
}
/// <summary>
/// Stops an active debug stream session.
/// </summary>
public void StopStream(string sessionId)
{
if (_sessions.TryRemove(sessionId, out var bridgeActor))
{
bridgeActor.Tell(new StopDebugStream());
_logger.LogInformation("Debug stream {SessionId} stopped", sessionId);
}
}
}
public record DebugStreamSession(string SessionId, DebugViewSnapshot InitialSnapshot);