Files
scadalink-design/src/ScadaLink.ManagementService/DebugStreamHub.cs

235 lines
9.7 KiB
C#

using System.Text;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Streaming;
using ScadaLink.Communication;
using ScadaLink.Security;
namespace ScadaLink.ManagementService;
/// <summary>
/// SignalR hub for real-time debug stream subscriptions.
/// External consumers (CLI) connect via WebSocket, authenticate with Basic Auth,
/// and receive streaming attribute value and alarm state changes.
/// </summary>
public class DebugStreamHub : Hub
{
private const string SessionIdKey = "DebugStreamSessionId";
private const string RolesKey = "DebugStreamRoles";
private const string PermittedSiteIdsKey = "DebugStreamPermittedSiteIds";
/// <summary>
/// Pure site-scope authorization check for a debug-stream subscription.
/// Returns true when the caller may subscribe to a debug stream for an instance
/// belonging to <paramref name="instanceSiteId"/>.
/// Admin role, or an empty <paramref name="permittedSiteIds"/> (system-wide
/// Deployment), grants access to any site; otherwise the instance's site must be
/// in the permitted set.
/// </summary>
public static bool IsInstanceAccessAllowed(
IReadOnlyCollection<string> roles,
IReadOnlyCollection<string> permittedSiteIds,
int instanceSiteId)
{
if (roles.Contains("Admin", StringComparer.OrdinalIgnoreCase)) return true;
if (permittedSiteIds.Count == 0) return true; // system-wide deployment
return permittedSiteIds.Contains(instanceSiteId.ToString());
}
private readonly DebugStreamService _debugStreamService;
private readonly IHubContext<DebugStreamHub> _hubContext;
private readonly ILogger<DebugStreamHub> _logger;
public DebugStreamHub(
DebugStreamService debugStreamService,
IHubContext<DebugStreamHub> hubContext,
ILogger<DebugStreamHub> logger)
{
_debugStreamService = debugStreamService;
_hubContext = hubContext;
_logger = logger;
}
/// <summary>
/// Authenticates the connection using Basic Auth from the HTTP negotiate request.
/// Validates credentials via LDAP and checks for the Deployment role.
/// </summary>
public override async Task OnConnectedAsync()
{
var httpContext = Context.GetHttpContext();
if (httpContext == null)
{
_logger.LogWarning("DebugStreamHub connection rejected: no HTTP context");
Context.Abort();
return;
}
// Extract Basic Auth credentials
var authHeader = httpContext.Request.Headers.Authorization.ToString();
if (string.IsNullOrEmpty(authHeader) || !authHeader.StartsWith("Basic ", StringComparison.OrdinalIgnoreCase))
{
_logger.LogWarning("DebugStreamHub connection rejected: missing Basic Auth header");
Context.Abort();
return;
}
string username, password;
try
{
var decoded = Encoding.UTF8.GetString(Convert.FromBase64String(authHeader["Basic ".Length..]));
var colon = decoded.IndexOf(':');
if (colon < 0) throw new FormatException();
username = decoded[..colon];
password = decoded[(colon + 1)..];
}
catch
{
_logger.LogWarning("DebugStreamHub connection rejected: malformed Basic Auth");
Context.Abort();
return;
}
// LDAP authentication
var ldapAuth = httpContext.RequestServices.GetRequiredService<LdapAuthService>();
var authResult = await ldapAuth.AuthenticateAsync(username, password);
if (!authResult.Success)
{
_logger.LogWarning("DebugStreamHub connection rejected: LDAP auth failed for {Username}", username);
Context.Abort();
return;
}
// Role check — Deployment role required
var roleMapper = httpContext.RequestServices.GetRequiredService<RoleMapper>();
var mappingResult = await roleMapper.MapGroupsToRolesAsync(
authResult.Groups ?? (IReadOnlyList<string>)Array.Empty<string>());
if (!mappingResult.Roles.Contains("Deployment"))
{
_logger.LogWarning("DebugStreamHub connection rejected: {Username} lacks Deployment role", username);
Context.Abort();
return;
}
// Persist the resolved identity on the connection so per-instance site-scope
// enforcement can be applied to SubscribeInstance calls.
Context.Items[RolesKey] = mappingResult.Roles.ToArray();
Context.Items[PermittedSiteIdsKey] = mappingResult.PermittedSiteIds.ToArray();
_logger.LogInformation("DebugStreamHub connection established for {Username}", username);
await base.OnConnectedAsync();
}
/// <summary>
/// Subscribes to a debug stream for the specified instance.
/// Sends the initial snapshot immediately, then streams incremental changes.
/// </summary>
public async Task SubscribeInstance(int instanceId)
{
// Stop any existing subscription for this connection
await UnsubscribeInstance();
var connectionId = Context.ConnectionId;
// Per-instance site-scope enforcement: a site-scoped Deployment user must not
// be able to stream an instance belonging to a site outside their scope.
var httpContext = Context.GetHttpContext();
if (httpContext == null)
{
_logger.LogWarning("DebugStreamHub: {ConnectionId} subscribe rejected — no HTTP context", connectionId);
await Clients.Caller.SendAsync("OnStreamTerminated", "Authorization context unavailable.");
return;
}
var roles = Context.Items.TryGetValue(RolesKey, out var rolesObj) && rolesObj is string[] r
? r : Array.Empty<string>();
var permittedSiteIds = Context.Items.TryGetValue(PermittedSiteIdsKey, out var sitesObj) && sitesObj is string[] s
? s : Array.Empty<string>();
var instanceRepo = httpContext.RequestServices.GetRequiredService<ITemplateEngineRepository>();
var instance = await instanceRepo.GetInstanceByIdAsync(instanceId);
if (instance == null)
{
_logger.LogWarning("DebugStreamHub: {ConnectionId} subscribe rejected — instance {InstanceId} not found",
connectionId, instanceId);
await Clients.Caller.SendAsync("OnStreamTerminated", $"Instance {instanceId} not found.");
return;
}
if (!IsInstanceAccessAllowed(roles, permittedSiteIds, instance.SiteId))
{
_logger.LogWarning(
"DebugStreamHub: {ConnectionId} subscribe to instance {InstanceId} denied — site {SiteId} outside permitted scope",
connectionId, instanceId, instance.SiteId);
await Clients.Caller.SendAsync("OnStreamTerminated",
$"Access denied: instance {instanceId} belongs to a site outside your Deployment scope.");
return;
}
try
{
// Use IHubContext for callbacks — the hub instance is transient (disposed after method returns),
// but IHubContext is a singleton that remains valid for the lifetime of the connection.
var hubClients = _hubContext.Clients;
var session = await _debugStreamService.StartStreamAsync(
instanceId,
onEvent: evt =>
{
// Fire-and-forget — if the client disconnects, SendAsync will fail silently
_ = evt switch
{
AttributeValueChanged changed =>
hubClients.Client(connectionId).SendAsync("OnAttributeChanged", changed),
AlarmStateChanged changed =>
hubClients.Client(connectionId).SendAsync("OnAlarmChanged", changed),
DebugViewSnapshot snapshot =>
hubClients.Client(connectionId).SendAsync("OnSnapshot", snapshot),
_ => Task.CompletedTask
};
},
onTerminated: () =>
{
_ = hubClients.Client(connectionId).SendAsync("OnStreamTerminated", "Site disconnected");
});
Context.Items[SessionIdKey] = session.SessionId;
// Send the initial snapshot
await Clients.Caller.SendAsync("OnSnapshot", session.InitialSnapshot);
_logger.LogInformation("DebugStreamHub: {ConnectionId} subscribed to instance {InstanceId}",
connectionId, instanceId);
}
catch (Exception ex)
{
_logger.LogError(ex, "DebugStreamHub: Failed to subscribe {ConnectionId} to instance {InstanceId}",
connectionId, instanceId);
await Clients.Caller.SendAsync("OnStreamTerminated", ex.Message);
}
}
/// <summary>
/// Unsubscribes from the current debug stream.
/// </summary>
public Task UnsubscribeInstance()
{
if (Context.Items.TryGetValue(SessionIdKey, out var sessionIdObj) && sessionIdObj is string sessionId)
{
_debugStreamService.StopStream(sessionId);
Context.Items.Remove(SessionIdKey);
_logger.LogInformation("DebugStreamHub: {ConnectionId} unsubscribed", Context.ConnectionId);
}
return Task.CompletedTask;
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
await UnsubscribeInstance();
await base.OnDisconnectedAsync(exception);
}
}