feat: replace debug view polling with real-time SignalR streaming

The debug view polled every 2s by re-subscribing for full snapshots. Now a
persistent DebugStreamBridgeActor on central subscribes once and receives
incremental Akka stream events from the site, forwarding them to the Blazor
component via callbacks and to the CLI via a new SignalR hub at
/hubs/debug-stream. Adds `debug stream` CLI command with auto-reconnect.
This commit is contained in:
Joseph Doherty
2026-03-21 01:34:53 -04:00
parent d91aa83665
commit fd2e96fea2
15 changed files with 777 additions and 75 deletions

View File

@@ -1,5 +1,9 @@
using System.CommandLine;
using System.CommandLine.Parsing;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using Microsoft.AspNetCore.SignalR.Client;
using ScadaLink.Commons.Messages.Management;
namespace ScadaLink.CLI.Commands;
@@ -11,6 +15,7 @@ public static class DebugCommands
var command = new Command("debug") { Description = "Runtime debugging" };
command.Add(BuildSnapshot(urlOption, formatOption, usernameOption, passwordOption));
command.Add(BuildStream(urlOption, formatOption, usernameOption, passwordOption));
return command;
}
@@ -28,4 +33,230 @@ public static class DebugCommands
});
return cmd;
}
private static Command BuildStream(Option<string> urlOption, Option<string> formatOption, Option<string> usernameOption, Option<string> passwordOption)
{
var idOption = new Option<int>("--id") { Description = "Instance ID", Required = true };
var cmd = new Command("stream") { Description = "Stream live attribute values and alarm states in real-time (Ctrl+C to stop)" };
cmd.Add(idOption);
cmd.SetAction(async (ParseResult result) =>
{
var instanceId = result.GetValue(idOption);
var format = result.GetValue(formatOption) ?? "json";
var config = CliConfig.Load();
var url = result.GetValue(urlOption);
if (string.IsNullOrWhiteSpace(url))
url = config.ManagementUrl;
if (string.IsNullOrWhiteSpace(url))
{
OutputFormatter.WriteError(
"No management URL specified. Use --url, set SCADALINK_MANAGEMENT_URL, or add 'managementUrl' to ~/.scadalink/config.json.",
"NO_URL");
return 1;
}
var username = result.GetValue(usernameOption);
var password = result.GetValue(passwordOption);
if (string.IsNullOrWhiteSpace(username) || string.IsNullOrWhiteSpace(password))
{
OutputFormatter.WriteError(
"Credentials required. Use --username and --password options.",
"NO_CREDENTIALS");
return 1;
}
return await StreamDebugAsync(url, username, password, instanceId, format);
});
return cmd;
}
private static async Task<int> StreamDebugAsync(string baseUrl, string username, string password, int instanceId, string format)
{
var hubUrl = baseUrl.TrimEnd('/') + "/hubs/debug-stream";
var credentials = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
var connection = new HubConnectionBuilder()
.WithUrl(hubUrl, options =>
{
options.Headers.Add("Authorization", $"Basic {credentials}");
})
.WithAutomaticReconnect()
.Build();
var cts = new CancellationTokenSource();
var exitTcs = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
var isTable = string.Equals(format, "table", StringComparison.OrdinalIgnoreCase);
// Register event handlers
connection.On<JsonElement>("OnSnapshot", snapshot =>
{
if (isTable)
{
Console.WriteLine("=== Initial Snapshot ===");
PrintSnapshotTable(snapshot);
Console.WriteLine("=== Streaming (Ctrl+C to stop) ===");
}
else
{
var obj = new { type = "snapshot", data = snapshot };
Console.WriteLine(JsonSerializer.Serialize(obj, new JsonSerializerOptions { WriteIndented = false }));
}
});
connection.On<JsonElement>("OnAttributeChanged", changed =>
{
if (isTable)
{
var name = changed.TryGetProperty("attributeName", out var n) ? n.GetString() : "?";
var value = changed.TryGetProperty("value", out var v) ? v.ToString() : "?";
var quality = changed.TryGetProperty("quality", out var q) ? q.GetString() : "?";
var ts = changed.TryGetProperty("timestamp", out var t) ? t.GetString() : "?";
Console.WriteLine($" ATTR {name,-30} {value,-20} {quality,-10} {ts}");
}
else
{
var obj = new { type = "attributeChanged", data = changed };
Console.WriteLine(JsonSerializer.Serialize(obj, new JsonSerializerOptions { WriteIndented = false }));
}
});
connection.On<JsonElement>("OnAlarmChanged", changed =>
{
if (isTable)
{
var name = changed.TryGetProperty("alarmName", out var n) ? n.GetString() : "?";
var state = changed.TryGetProperty("state", out var s) ? s.ToString() : "?";
var priority = changed.TryGetProperty("priority", out var p) ? p.ToString() : "?";
var ts = changed.TryGetProperty("timestamp", out var t) ? t.GetString() : "?";
Console.WriteLine($" ALARM {name,-30} {state,-20} P{priority,-9} {ts}");
}
else
{
var obj = new { type = "alarmChanged", data = changed };
Console.WriteLine(JsonSerializer.Serialize(obj, new JsonSerializerOptions { WriteIndented = false }));
}
});
connection.On<string>("OnStreamTerminated", reason =>
{
Console.Error.WriteLine($"Stream terminated: {reason}");
exitTcs.TrySetResult(1);
});
connection.Closed += ex =>
{
if (!cts.IsCancellationRequested)
{
Console.Error.WriteLine(ex != null
? $"Connection lost: {ex.Message}"
: "Connection closed.");
}
exitTcs.TrySetResult(cts.IsCancellationRequested ? 0 : 1);
return Task.CompletedTask;
};
connection.Reconnecting += ex =>
{
Console.Error.WriteLine($"Reconnecting... ({ex?.Message})");
return Task.CompletedTask;
};
connection.Reconnected += _ =>
{
Console.Error.WriteLine("Reconnected. Re-subscribing...");
return connection.InvokeAsync("SubscribeInstance", instanceId);
};
// Connect and subscribe
try
{
await connection.StartAsync(cts.Token);
}
catch (Exception ex)
{
OutputFormatter.WriteError($"Connection failed: {ex.Message}", "CONNECTION_FAILED");
return 1;
}
try
{
await connection.InvokeAsync("SubscribeInstance", instanceId, cts.Token);
}
catch (Exception ex)
{
OutputFormatter.WriteError($"Subscribe failed: {ex.Message}", "SUBSCRIBE_FAILED");
await connection.DisposeAsync();
return 1;
}
if (isTable)
{
Console.WriteLine($"Connected to instance {instanceId}. Waiting for data...");
}
// Wait for cancellation (Ctrl+C) or stream termination
try
{
await exitTcs.Task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
// Ctrl+C — graceful shutdown
}
try
{
await connection.InvokeAsync("UnsubscribeInstance");
}
catch
{
// Best effort
}
await connection.DisposeAsync();
return exitTcs.Task.IsCompletedSuccessfully ? exitTcs.Task.Result : 0;
}
private static void PrintSnapshotTable(JsonElement snapshot)
{
if (snapshot.TryGetProperty("attributeValues", out var attrs) && attrs.ValueKind == JsonValueKind.Array)
{
Console.WriteLine(" Attributes:");
Console.WriteLine($" {"Name",-30} {"Value",-20} {"Quality",-10} Timestamp");
Console.WriteLine($" {new string('-', 30)} {new string('-', 20)} {new string('-', 10)} {new string('-', 25)}");
foreach (var av in attrs.EnumerateArray())
{
var name = av.TryGetProperty("attributeName", out var n) ? n.GetString() : "?";
var value = av.TryGetProperty("value", out var v) ? v.ToString() : "?";
var quality = av.TryGetProperty("quality", out var q) ? q.GetString() : "?";
var ts = av.TryGetProperty("timestamp", out var t) ? t.GetString() : "?";
Console.WriteLine($" {name,-30} {value,-20} {quality,-10} {ts}");
}
}
if (snapshot.TryGetProperty("alarmStates", out var alarms) && alarms.ValueKind == JsonValueKind.Array)
{
Console.WriteLine(" Alarms:");
Console.WriteLine($" {"Name",-30} {"State",-20} {"Priority",-10} Timestamp");
Console.WriteLine($" {new string('-', 30)} {new string('-', 20)} {new string('-', 10)} {new string('-', 25)}");
foreach (var al in alarms.EnumerateArray())
{
var name = al.TryGetProperty("alarmName", out var n) ? n.GetString() : "?";
var state = al.TryGetProperty("state", out var s) ? s.ToString() : "?";
var priority = al.TryGetProperty("priority", out var p) ? p.ToString() : "?";
var ts = al.TryGetProperty("timestamp", out var t) ? t.GetString() : "?";
Console.WriteLine($" {name,-30} {state,-20} P{priority,-9} {ts}");
}
}
}
}

View File

@@ -1038,6 +1038,25 @@ scadalink --url <url> debug snapshot --id <int>
The command resolves the instance's site internally and routes the request to the correct site cluster. Returns all attribute values (name, value, quality, timestamp) and alarm states (name, state, priority, timestamp) at the moment the request reaches the site.
#### `debug stream`
Stream live attribute values and alarm state changes in real-time using a SignalR WebSocket connection to the central server's `/hubs/debug-stream` hub. Events are printed as they arrive. Press Ctrl+C to disconnect.
```sh
scadalink --url <url> debug stream --id <int>
```
| Option | Required | Description |
|--------|----------|-------------|
| `--id` | yes | Instance ID |
The default JSON format outputs one NDJSON object per event line with a `type` field (`snapshot`, `attributeChanged`, or `alarmChanged`). Table format (`--format table`) shows a formatted initial snapshot followed by streaming rows prefixed with `ATTR` or `ALARM`.
Features:
- Automatic reconnection on connection loss with re-subscribe.
- Works through the Traefik load balancer (WebSocket upgrade proxied natively).
- Requires the `Deployment` role.
---
### `audit-log` — Audit log queries

View File

@@ -11,6 +11,7 @@
<InternalsVisibleTo Include="ScadaLink.CLI.Tests" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="9.0.3" />
<PackageReference Include="System.CommandLine" Version="2.0.5" />
</ItemGroup>
<ItemGroup>

View File

@@ -10,7 +10,7 @@
@attribute [Authorize(Policy = AuthorizationPolicies.RequireDeployment)]
@inject ITemplateEngineRepository TemplateEngineRepository
@inject ISiteRepository SiteRepository
@inject CommunicationService CommunicationService
@inject DebugStreamService DebugStreamService
@inject IJSRuntime JS
@implements IDisposable
@@ -38,11 +38,11 @@
</div>
<div class="col-md-4">
<label class="form-label small">Instance</label>
<select class="form-select form-select-sm" @bind="_selectedInstanceName">
<option value="">Select instance...</option>
<select class="form-select form-select-sm" @bind="_selectedInstanceId" @bind:after="OnInstanceSelectionChanged">
<option value="0">Select instance...</option>
@foreach (var inst in _siteInstances)
{
<option value="@inst.UniqueName">@inst.UniqueName (@inst.State)</option>
<option value="@inst.Id">@inst.UniqueName (@inst.State)</option>
}
</select>
</div>
@@ -50,7 +50,7 @@
@if (!_connected)
{
<button class="btn btn-primary btn-sm" @onclick="Connect"
disabled="@(string.IsNullOrEmpty(_selectedInstanceName) || _selectedSiteId == 0 || _connecting)">
disabled="@(_selectedInstanceId == 0 || _selectedSiteId == 0 || _connecting)">
@if (_connecting) { <span class="spinner-border spinner-border-sm me-1"></span> }
Connect
</button>
@@ -58,7 +58,10 @@
else
{
<button class="btn btn-outline-danger btn-sm" @onclick="Disconnect">Disconnect</button>
<span class="badge bg-success align-self-center">Connected</span>
<span class="badge bg-success align-self-center">
<span class="spinner-grow spinner-grow-sm me-1" style="width: 0.5rem; height: 0.5rem;"></span>
Live
</span>
}
</div>
</div>
@@ -153,7 +156,7 @@
private List<Site> _sites = new();
private List<Instance> _siteInstances = new();
private int _selectedSiteId;
private string _selectedInstanceName = string.Empty;
private int _selectedInstanceId;
private bool _loading = true;
private bool _connected;
private bool _connecting;
@@ -162,7 +165,7 @@
private Dictionary<string, AttributeValueChanged> _attributeValues = new();
private Dictionary<string, AlarmStateChanged> _alarmStates = new();
private Timer? _refreshTimer;
private DebugStreamSession? _session;
private ToastNotification _toast = default!;
protected override async Task OnInitializedAsync()
@@ -183,14 +186,14 @@
if (!firstRender) return;
var storedSiteId = await JS.InvokeAsync<string>("localStorage.getItem", "debugView.siteId");
var storedInstanceName = await JS.InvokeAsync<string>("localStorage.getItem", "debugView.instanceName");
var storedInstanceId = await JS.InvokeAsync<string>("localStorage.getItem", "debugView.instanceId");
if (!string.IsNullOrEmpty(storedSiteId) && int.TryParse(storedSiteId, out var siteId)
&& !string.IsNullOrEmpty(storedInstanceName))
&& !string.IsNullOrEmpty(storedInstanceId) && int.TryParse(storedInstanceId, out var instanceId))
{
_selectedSiteId = siteId;
await LoadInstancesForSite();
_selectedInstanceName = storedInstanceName;
_selectedInstanceId = instanceId;
StateHasChanged();
await Connect();
}
@@ -199,7 +202,7 @@
private async Task LoadInstancesForSite()
{
_siteInstances.Clear();
_selectedInstanceName = string.Empty;
_selectedInstanceId = 0;
if (_selectedSiteId == 0) return;
try
{
@@ -213,58 +216,64 @@
}
}
private void OnInstanceSelectionChanged()
{
// No-op; selection is tracked via _selectedInstanceId binding
}
private async Task Connect()
{
if (string.IsNullOrEmpty(_selectedInstanceName) || _selectedSiteId == 0) return;
if (_selectedInstanceId == 0 || _selectedSiteId == 0) return;
_connecting = true;
try
{
var site = _sites.FirstOrDefault(s => s.Id == _selectedSiteId);
if (site == null) return;
var session = await DebugStreamService.StartStreamAsync(
_selectedInstanceId,
onEvent: evt =>
{
switch (evt)
{
case AttributeValueChanged av:
_attributeValues[av.AttributeName] = av;
_ = InvokeAsync(StateHasChanged);
break;
case AlarmStateChanged al:
_alarmStates[al.AlarmName] = al;
_ = InvokeAsync(StateHasChanged);
break;
}
},
onTerminated: () =>
{
_connected = false;
_session = null;
_ = InvokeAsync(() =>
{
_toast.ShowError("Debug stream terminated (site disconnected).");
StateHasChanged();
});
});
var request = new SubscribeDebugViewRequest(_selectedInstanceName, Guid.NewGuid().ToString("N"));
_snapshot = await CommunicationService.SubscribeDebugViewAsync(site.SiteIdentifier, request);
_session = session;
// Populate initial state from snapshot
_attributeValues.Clear();
foreach (var av in _snapshot.AttributeValues)
{
foreach (var av in session.InitialSnapshot.AttributeValues)
_attributeValues[av.AttributeName] = av;
}
_alarmStates.Clear();
foreach (var al in _snapshot.AlarmStates)
{
_alarmStates[al.AlarmName] = al;
}
_alarmStates.Clear();
foreach (var al in session.InitialSnapshot.AlarmStates)
_alarmStates[al.AlarmName] = al;
_snapshot = session.InitialSnapshot;
_connected = true;
// Persist selection to localStorage for auto-reconnect on refresh
await JS.InvokeVoidAsync("localStorage.setItem", "debugView.siteId", _selectedSiteId.ToString());
await JS.InvokeVoidAsync("localStorage.setItem", "debugView.instanceName", _selectedInstanceName);
await JS.InvokeVoidAsync("localStorage.setItem", "debugView.siteIdentifier", site.SiteIdentifier);
await JS.InvokeVoidAsync("localStorage.setItem", "debugView.instanceId", _selectedInstanceId.ToString());
_toast.ShowSuccess($"Connected to {_selectedInstanceName}");
// Periodic refresh (simulating SignalR push by re-subscribing)
_refreshTimer = new Timer(async _ =>
{
try
{
var refreshRequest = new SubscribeDebugViewRequest(_selectedInstanceName, Guid.NewGuid().ToString("N"));
var newSnapshot = await CommunicationService.SubscribeDebugViewAsync(site.SiteIdentifier, refreshRequest);
foreach (var av in newSnapshot.AttributeValues)
_attributeValues[av.AttributeName] = av;
foreach (var al in newSnapshot.AlarmStates)
_alarmStates[al.AlarmName] = al;
_snapshot = newSnapshot;
await InvokeAsync(StateHasChanged);
}
catch
{
// Connection may have dropped
}
}, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2));
var instance = _siteInstances.FirstOrDefault(i => i.Id == _selectedInstanceId);
_toast.ShowSuccess($"Streaming {instance?.UniqueName ?? "instance"}");
}
catch (Exception ex)
{
@@ -275,23 +284,15 @@
private async Task Disconnect()
{
_refreshTimer?.Dispose();
_refreshTimer = null;
if (_connected && _selectedSiteId > 0 && !string.IsNullOrEmpty(_selectedInstanceName))
if (_session != null)
{
var site = _sites.FirstOrDefault(s => s.Id == _selectedSiteId);
if (site != null)
{
var request = new UnsubscribeDebugViewRequest(_selectedInstanceName, Guid.NewGuid().ToString("N"));
CommunicationService.UnsubscribeDebugView(site.SiteIdentifier, request);
}
DebugStreamService.StopStream(_session.SessionId);
_session = null;
}
// Clear persisted selection — user explicitly disconnected
await JS.InvokeVoidAsync("localStorage.removeItem", "debugView.siteId");
await JS.InvokeVoidAsync("localStorage.removeItem", "debugView.instanceName");
await JS.InvokeVoidAsync("localStorage.removeItem", "debugView.siteIdentifier");
await JS.InvokeVoidAsync("localStorage.removeItem", "debugView.instanceId");
_connected = false;
_snapshot = null;
@@ -314,6 +315,9 @@
public void Dispose()
{
_refreshTimer?.Dispose();
if (_session != null)
{
DebugStreamService.StopStream(_session.SessionId);
}
}
}

View File

@@ -0,0 +1,100 @@
using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Persistent actor (one per active debug session) on the central side.
/// Sends SubscribeDebugViewRequest to the site via CentralCommunicationActor (with THIS actor
/// as the Sender), so the site's InstanceActor registers this actor as the debug subscriber.
/// Stream events flow back via Akka remoting and are forwarded to the consumer via callbacks.
/// </summary>
public class DebugStreamBridgeActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteIdentifier;
private readonly string _instanceUniqueName;
private readonly string _correlationId;
private readonly IActorRef _centralCommunicationActor;
private readonly Action<object> _onEvent;
private readonly Action _onTerminated;
public DebugStreamBridgeActor(
string siteIdentifier,
string instanceUniqueName,
string correlationId,
IActorRef centralCommunicationActor,
Action<object> onEvent,
Action onTerminated)
{
_siteIdentifier = siteIdentifier;
_instanceUniqueName = instanceUniqueName;
_correlationId = correlationId;
_centralCommunicationActor = centralCommunicationActor;
_onEvent = onEvent;
_onTerminated = onTerminated;
// Initial snapshot response from the site
Receive<DebugViewSnapshot>(snapshot =>
{
_log.Info("Received initial snapshot for {0} ({1} attrs, {2} alarms)",
_instanceUniqueName, snapshot.AttributeValues.Count, snapshot.AlarmStates.Count);
_onEvent(snapshot);
});
// Ongoing stream events from the site's InstanceActor
Receive<AttributeValueChanged>(changed => _onEvent(changed));
Receive<AlarmStateChanged>(changed => _onEvent(changed));
// Consumer requests stop
Receive<StopDebugStream>(_ =>
{
_log.Info("Stopping debug stream for {0}", _instanceUniqueName);
SendUnsubscribe();
Context.Stop(Self);
});
// Site disconnected — CentralCommunicationActor notifies us
Receive<DebugStreamTerminated>(msg =>
{
_log.Warning("Debug stream terminated for {0} (site {1} disconnected)", _instanceUniqueName, msg.SiteId);
_onTerminated();
Context.Stop(Self);
});
// Orphan safety net — if nobody stops us within 5 minutes, self-terminate
Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
Receive<ReceiveTimeout>(_ =>
{
_log.Warning("Debug stream for {0} timed out (orphaned session), stopping", _instanceUniqueName);
SendUnsubscribe();
_onTerminated();
Context.Stop(Self);
});
}
protected override void PreStart()
{
_log.Info("Starting debug stream bridge for {0} on site {1}", _instanceUniqueName, _siteIdentifier);
// Send subscribe request via CentralCommunicationActor.
// THIS actor is the Sender, so the site's InstanceActor registers us as the subscriber.
var request = new SubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
private void SendUnsubscribe()
{
var request = new UnsubscribeDebugViewRequest(_instanceUniqueName, _correlationId);
var envelope = new SiteEnvelope(_siteIdentifier, request);
_centralCommunicationActor.Tell(envelope, Self);
}
}
/// <summary>
/// Message sent to a DebugStreamBridgeActor to stop the debug stream session.
/// </summary>
public record StopDebugStream;

View File

@@ -48,12 +48,17 @@ public class CommunicationService
GetActor().Tell(new RefreshSiteAddresses());
}
private IActorRef GetActor()
/// <summary>
/// Gets the central communication actor reference. Throws if not yet initialized.
/// </summary>
public IActorRef GetCommunicationActor()
{
return _centralCommunicationActor
?? throw new InvalidOperationException("CommunicationService not initialized. CentralCommunicationActor not set.");
}
private IActorRef GetActor() => GetCommunicationActor();
// ── Pattern 1: Instance Deployment ──
public async Task<DeploymentStatusResponse> DeployInstanceAsync(

View File

@@ -0,0 +1,146 @@
using System.Collections.Concurrent;
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Communication.Actors;
namespace ScadaLink.Communication;
/// <summary>
/// Manages debug stream sessions by creating DebugStreamBridgeActors that persist
/// as subscribers on the site side. Both the Blazor debug view and the SignalR hub
/// use this service to start/stop streams.
/// </summary>
public class DebugStreamService
{
private readonly CommunicationService _communicationService;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<DebugStreamService> _logger;
private readonly ConcurrentDictionary<string, IActorRef> _sessions = new();
private ActorSystem? _actorSystem;
public DebugStreamService(
CommunicationService communicationService,
IServiceProvider serviceProvider,
ILogger<DebugStreamService> logger)
{
_communicationService = communicationService;
_serviceProvider = serviceProvider;
_logger = logger;
}
/// <summary>
/// Sets the ActorSystem reference. Called during actor system startup (from AkkaHostedService).
/// </summary>
public void SetActorSystem(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
}
/// <summary>
/// Starts a debug stream session. Returns the initial snapshot.
/// Ongoing events are delivered via the onEvent callback.
/// The onTerminated callback fires if the stream is killed (site disconnect, timeout).
/// </summary>
public async Task<DebugStreamSession> StartStreamAsync(
int instanceId,
Action<object> onEvent,
Action onTerminated,
CancellationToken ct = default)
{
var system = _actorSystem
?? throw new InvalidOperationException("DebugStreamService not initialized. ActorSystem not set.");
// Resolve instance → unique name + site
string instanceUniqueName;
string siteIdentifier;
using (var scope = _serviceProvider.CreateScope())
{
var instanceRepo = scope.ServiceProvider.GetRequiredService<ITemplateEngineRepository>();
var instance = await instanceRepo.GetInstanceByIdAsync(instanceId)
?? throw new InvalidOperationException($"Instance {instanceId} not found.");
var siteRepo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var site = await siteRepo.GetSiteByIdAsync(instance.SiteId)
?? throw new InvalidOperationException($"Site {instance.SiteId} not found.");
instanceUniqueName = instance.UniqueName;
siteIdentifier = site.SiteIdentifier;
}
var sessionId = Guid.NewGuid().ToString("N");
// Capture the initial snapshot via a TaskCompletionSource
var snapshotTcs = new TaskCompletionSource<DebugViewSnapshot>(TaskCreationOptions.RunContinuationsAsynchronously);
Action<object> onEventWrapper = evt =>
{
if (evt is DebugViewSnapshot snapshot && !snapshotTcs.Task.IsCompleted)
{
snapshotTcs.TrySetResult(snapshot);
}
else
{
onEvent(evt);
}
};
Action onTerminatedWrapper = () =>
{
_sessions.TryRemove(sessionId, out _);
snapshotTcs.TrySetException(new InvalidOperationException("Debug stream terminated before snapshot received."));
onTerminated();
};
// Create the bridge actor — use type-based Props to avoid expression tree limitations with closures
var commActor = _communicationService.GetCommunicationActor();
var props = Props.Create(typeof(DebugStreamBridgeActor),
siteIdentifier,
instanceUniqueName,
sessionId,
commActor,
onEventWrapper,
onTerminatedWrapper);
var bridgeActor = system.ActorOf(props, $"debug-stream-{sessionId}");
_sessions[sessionId] = bridgeActor;
// Wait for the initial snapshot (with timeout)
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
try
{
var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
sessionId, instanceUniqueName, siteIdentifier);
return new DebugStreamSession(sessionId, snapshot);
}
catch (OperationCanceledException)
{
StopStream(sessionId);
throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
}
}
/// <summary>
/// Stops an active debug stream session.
/// </summary>
public void StopStream(string sessionId)
{
if (_sessions.TryRemove(sessionId, out var bridgeActor))
{
bridgeActor.Tell(new StopDebugStream());
_logger.LogInformation("Debug stream {SessionId} stopped", sessionId);
}
}
}
public record DebugStreamSession(string SessionId, DebugViewSnapshot InitialSnapshot);

View File

@@ -10,6 +10,7 @@ public static class ServiceCollectionExtensions
.BindConfiguration("Communication");
services.AddSingleton<CommunicationService>();
services.AddSingleton<DebugStreamService>();
return services;
}

View File

@@ -185,6 +185,11 @@ akka {{
var commService = _serviceProvider.GetService<CommunicationService>();
commService?.SetCommunicationActor(centralCommActor);
// Wire up the DebugStreamService with the ActorSystem
var debugStreamService = _serviceProvider.GetService<DebugStreamService>();
debugStreamService?.SetActorSystem(_actorSystem!);
// Management Service — accessible via ClusterClient
var mgmtLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ScadaLink.ManagementService.ManagementActor>();

View File

@@ -138,6 +138,7 @@ try
app.MapCentralUI<ScadaLink.Host.Components.App>();
app.MapInboundAPI();
app.MapManagementAPI();
app.MapHub<ScadaLink.ManagementService.DebugStreamHub>("/hubs/debug-stream");
// Compile and register all Inbound API method scripts at startup
using (var scope = app.Services.CreateScope())

View File

@@ -0,0 +1,164 @@
using System.Text;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication;
using ScadaLink.Security;
namespace ScadaLink.ManagementService;
/// <summary>
/// SignalR hub for real-time debug stream subscriptions.
/// External consumers (CLI) connect via WebSocket, authenticate with Basic Auth,
/// and receive streaming attribute value and alarm state changes.
/// </summary>
public class DebugStreamHub : Hub
{
private const string SessionIdKey = "DebugStreamSessionId";
private readonly DebugStreamService _debugStreamService;
private readonly ILogger<DebugStreamHub> _logger;
public DebugStreamHub(DebugStreamService debugStreamService, ILogger<DebugStreamHub> logger)
{
_debugStreamService = debugStreamService;
_logger = logger;
}
/// <summary>
/// Authenticates the connection using Basic Auth from the HTTP negotiate request.
/// Validates credentials via LDAP and checks for the Deployment role.
/// </summary>
public override async Task OnConnectedAsync()
{
var httpContext = Context.GetHttpContext();
if (httpContext == null)
{
_logger.LogWarning("DebugStreamHub connection rejected: no HTTP context");
Context.Abort();
return;
}
// Extract Basic Auth credentials
var authHeader = httpContext.Request.Headers.Authorization.ToString();
if (string.IsNullOrEmpty(authHeader) || !authHeader.StartsWith("Basic ", StringComparison.OrdinalIgnoreCase))
{
_logger.LogWarning("DebugStreamHub connection rejected: missing Basic Auth header");
Context.Abort();
return;
}
string username, password;
try
{
var decoded = Encoding.UTF8.GetString(Convert.FromBase64String(authHeader["Basic ".Length..]));
var colon = decoded.IndexOf(':');
if (colon < 0) throw new FormatException();
username = decoded[..colon];
password = decoded[(colon + 1)..];
}
catch
{
_logger.LogWarning("DebugStreamHub connection rejected: malformed Basic Auth");
Context.Abort();
return;
}
// LDAP authentication
var ldapAuth = httpContext.RequestServices.GetRequiredService<LdapAuthService>();
var authResult = await ldapAuth.AuthenticateAsync(username, password);
if (!authResult.Success)
{
_logger.LogWarning("DebugStreamHub connection rejected: LDAP auth failed for {Username}", username);
Context.Abort();
return;
}
// Role check — Deployment role required
var roleMapper = httpContext.RequestServices.GetRequiredService<RoleMapper>();
var mappingResult = await roleMapper.MapGroupsToRolesAsync(
authResult.Groups ?? (IReadOnlyList<string>)Array.Empty<string>());
if (!mappingResult.Roles.Contains("Deployment"))
{
_logger.LogWarning("DebugStreamHub connection rejected: {Username} lacks Deployment role", username);
Context.Abort();
return;
}
_logger.LogInformation("DebugStreamHub connection established for {Username}", username);
await base.OnConnectedAsync();
}
/// <summary>
/// Subscribes to a debug stream for the specified instance.
/// Sends the initial snapshot immediately, then streams incremental changes.
/// </summary>
public async Task SubscribeInstance(int instanceId)
{
// Stop any existing subscription for this connection
await UnsubscribeInstance();
var connectionId = Context.ConnectionId;
try
{
var session = await _debugStreamService.StartStreamAsync(
instanceId,
onEvent: evt =>
{
// Fire-and-forget — if the client disconnects, SendAsync will fail silently
_ = evt switch
{
AttributeValueChanged changed =>
Clients.Client(connectionId).SendAsync("OnAttributeChanged", changed),
AlarmStateChanged changed =>
Clients.Client(connectionId).SendAsync("OnAlarmChanged", changed),
DebugViewSnapshot snapshot =>
Clients.Client(connectionId).SendAsync("OnSnapshot", snapshot),
_ => Task.CompletedTask
};
},
onTerminated: () =>
{
_ = Clients.Client(connectionId).SendAsync("OnStreamTerminated", "Site disconnected");
});
Context.Items[SessionIdKey] = session.SessionId;
// Send the initial snapshot
await Clients.Caller.SendAsync("OnSnapshot", session.InitialSnapshot);
_logger.LogInformation("DebugStreamHub: {ConnectionId} subscribed to instance {InstanceId}",
connectionId, instanceId);
}
catch (Exception ex)
{
_logger.LogError(ex, "DebugStreamHub: Failed to subscribe {ConnectionId} to instance {InstanceId}",
connectionId, instanceId);
await Clients.Caller.SendAsync("OnStreamTerminated", ex.Message);
}
}
/// <summary>
/// Unsubscribes from the current debug stream.
/// </summary>
public Task UnsubscribeInstance()
{
if (Context.Items.TryGetValue(SessionIdKey, out var sessionIdObj) && sessionIdObj is string sessionId)
{
_debugStreamService.StopStream(sessionId);
Context.Items.Remove(SessionIdKey);
_logger.LogInformation("DebugStreamHub: {ConnectionId} unsubscribed", Context.ConnectionId);
}
return Task.CompletedTask;
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
await UnsubscribeInstance();
await base.OnDisconnectedAsync(exception);
}
}