Phase 3B: Site I/O & Observability — Communication, DCL, Script/Alarm actors, Health, Event Logging

Communication Layer (WP-1–5):
- 8 message patterns with correlation IDs, per-pattern timeouts
- Central/Site communication actors, transport heartbeat config
- Connection failure handling (no central buffering, debug streams killed)

Data Connection Layer (WP-6–14, WP-34):
- Connection actor with Become/Stash lifecycle (Connecting/Connected/Reconnecting)
- OPC UA + LmxProxy adapters behind IDataConnection
- Auto-reconnect, bad quality propagation, transparent re-subscribe
- Write-back, tag path resolution with retry, health reporting
- Protocol extensibility via DataConnectionFactory

Site Runtime (WP-15–25, WP-32–33):
- ScriptActor/ScriptExecutionActor (triggers, concurrent execution, blocking I/O dispatcher)
- AlarmActor/AlarmExecutionActor (ValueMatch/RangeViolation/RateOfChange, in-memory state)
- SharedScriptLibrary (inline execution), ScriptRuntimeContext (API)
- ScriptCompilationService (Roslyn, forbidden API enforcement, execution timeout)
- Recursion limit (default 10), call direction enforcement
- SiteStreamManager (per-subscriber bounded buffers, fire-and-forget)
- Debug view backend (snapshot + stream), concurrency serialization
- Local artifact storage (4 SQLite tables)

Health Monitoring (WP-26–28):
- SiteHealthCollector (thread-safe counters, connection state)
- HealthReportSender (30s interval, monotonic sequence numbers)
- CentralHealthAggregator (offline detection 60s, online recovery)

Site Event Logging (WP-29–31):
- SiteEventLogger (SQLite, 6 event categories, ISO 8601 UTC)
- EventLogPurgeService (30-day retention, 1GB cap)
- EventLogQueryService (filters, keyword search, keyset pagination)

541 tests pass, zero warnings.
This commit is contained in:
Joseph Doherty
2026-03-16 20:57:25 -04:00
parent a3bf0c43f3
commit 389f5a0378
97 changed files with 8308 additions and 127 deletions

View File

@@ -0,0 +1,172 @@
using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Health;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Central-side actor that routes messages from central to site clusters via Akka remoting.
/// Maintains a registry of known site actor paths (learned from heartbeats/connection events).
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
/// </summary>
public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
/// <summary>
/// Maps SiteId → remote SiteCommunicationActor selection.
/// Updated when heartbeats arrive or connection state changes.
/// </summary>
private readonly Dictionary<string, ActorSelection> _siteSelections = new();
/// <summary>
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
/// Used to kill debug streams on site disconnection (WP-5).
/// </summary>
private readonly Dictionary<string, (string SiteId, IActorRef Subscriber)> _debugSubscriptions = new();
/// <summary>
/// Tracks in-progress deployments: deploymentId → siteId.
/// On central failover, in-progress deployments are treated as failed (WP-5).
/// </summary>
private readonly Dictionary<string, string> _inProgressDeployments = new();
public CentralCommunicationActor()
{
// Site registration via heartbeats
Receive<HeartbeatMessage>(HandleHeartbeat);
// Connection state changes
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
// Site registration command (manual or from discovery)
Receive<RegisterSite>(HandleRegisterSite);
// Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope);
}
private void HandleHeartbeat(HeartbeatMessage heartbeat)
{
// Heartbeats arrive from sites — forward to any interested central actors
// The sender's path tells us the site's communication actor address
if (!_siteSelections.ContainsKey(heartbeat.SiteId))
{
var senderPath = Sender.Path.ToString();
_log.Info("Learned site {0} from heartbeat at path {1}", heartbeat.SiteId, senderPath);
}
// Forward heartbeat to parent/subscribers (central health monitoring)
Context.Parent.Tell(heartbeat);
}
private void HandleConnectionStateChanged(ConnectionStateChanged msg)
{
if (!msg.IsConnected)
{
_log.Warning("Site {0} disconnected at {1}", msg.SiteId, msg.Timestamp);
// WP-5: Kill active debug streams for the disconnected site
var toRemove = _debugSubscriptions
.Where(kvp => kvp.Value.SiteId == msg.SiteId)
.ToList();
foreach (var kvp in toRemove)
{
_log.Info("Killing debug stream {0} for disconnected site {1}", kvp.Key, msg.SiteId);
kvp.Value.Subscriber.Tell(new DebugStreamTerminated(msg.SiteId, kvp.Key));
_debugSubscriptions.Remove(kvp.Key);
}
// WP-5: Mark in-progress deployments as failed
var failedDeployments = _inProgressDeployments
.Where(kvp => kvp.Value == msg.SiteId)
.Select(kvp => kvp.Key)
.ToList();
foreach (var deploymentId in failedDeployments)
{
_log.Warning("Deployment {0} to site {1} treated as failed due to disconnection",
deploymentId, msg.SiteId);
_inProgressDeployments.Remove(deploymentId);
}
_siteSelections.Remove(msg.SiteId);
}
else
{
_log.Info("Site {0} connected at {1}", msg.SiteId, msg.Timestamp);
}
}
private void HandleRegisterSite(RegisterSite msg)
{
var selection = Context.ActorSelection(msg.RemoteActorPath);
_siteSelections[msg.SiteId] = selection;
_log.Info("Registered site {0} at path {1}", msg.SiteId, msg.RemoteActorPath);
}
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteSelections.TryGetValue(envelope.SiteId, out var siteSelection))
{
_log.Warning("No known path for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
// Track debug subscriptions for cleanup on disconnect
TrackMessageForCleanup(envelope);
// Forward the inner message to the site, preserving the original sender
// so the site can reply directly to the caller (completing the Ask pattern)
siteSelection.Tell(envelope.Message, Sender);
}
private void TrackMessageForCleanup(SiteEnvelope envelope)
{
switch (envelope.Message)
{
case Commons.Messages.DebugView.SubscribeDebugViewRequest sub:
_debugSubscriptions[sub.CorrelationId] = (envelope.SiteId, Sender);
break;
case Commons.Messages.DebugView.UnsubscribeDebugViewRequest unsub:
_debugSubscriptions.Remove(unsub.CorrelationId);
break;
case Commons.Messages.Deployment.DeployInstanceCommand deploy:
_inProgressDeployments[deploy.DeploymentId] = envelope.SiteId;
break;
}
}
protected override void PreStart()
{
_log.Info("CentralCommunicationActor started");
}
protected override void PostStop()
{
_log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5).");
// On central failover, all in-progress deployments are failed
_inProgressDeployments.Clear();
_debugSubscriptions.Clear();
}
}
/// <summary>
/// Command to register a site's remote communication actor path.
/// </summary>
public record RegisterSite(string SiteId, string RemoteActorPath);
/// <summary>
/// Notification sent to debug view subscribers when the stream is terminated
/// due to site disconnection (WP-5).
/// </summary>
public record DebugStreamTerminated(string SiteId, string CorrelationId);

View File

@@ -0,0 +1,212 @@
using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Messages.RemoteQuery;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Site-side actor that receives messages from central via Akka remoting and routes
/// them to the appropriate local actors. Also sends heartbeats and health reports
/// to central.
///
/// WP-4: Routes all 8 message patterns to local handlers.
/// </summary>
public class SiteCommunicationActor : ReceiveActor, IWithTimers
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _siteId;
private readonly CommunicationOptions _options;
/// <summary>
/// Reference to the local Deployment Manager singleton proxy.
/// </summary>
private readonly IActorRef _deploymentManagerProxy;
/// <summary>
/// Optional reference to the central communication actor for sending heartbeats/health.
/// Set via RegisterCentral message.
/// </summary>
private ActorSelection? _centralSelection;
/// <summary>
/// Local actor references for routing specific message patterns.
/// Populated via registration messages.
/// </summary>
private IActorRef? _eventLogHandler;
private IActorRef? _parkedMessageHandler;
private IActorRef? _integrationHandler;
private IActorRef? _artifactHandler;
public ITimerScheduler Timers { get; set; } = null!;
public SiteCommunicationActor(
string siteId,
CommunicationOptions options,
IActorRef deploymentManagerProxy)
{
_siteId = siteId;
_options = options;
_deploymentManagerProxy = deploymentManagerProxy;
// Registration
Receive<RegisterCentralPath>(HandleRegisterCentral);
Receive<RegisterLocalHandler>(HandleRegisterLocalHandler);
// Pattern 1: Instance Deployment — forward to Deployment Manager
Receive<DeployInstanceCommand>(msg =>
{
_log.Debug("Routing DeployInstanceCommand for {0} to DeploymentManager", msg.InstanceUniqueName);
_deploymentManagerProxy.Forward(msg);
});
// Pattern 2: Lifecycle — forward to Deployment Manager
Receive<DisableInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
Receive<EnableInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
Receive<DeleteInstanceCommand>(msg => _deploymentManagerProxy.Forward(msg));
// Pattern 3: Artifact Deployment — forward to artifact handler if registered
Receive<DeployArtifactsCommand>(msg =>
{
if (_artifactHandler != null)
_artifactHandler.Forward(msg);
else
{
_log.Warning("No artifact handler registered, replying with failure");
Sender.Tell(new ArtifactDeploymentResponse(
msg.DeploymentId, _siteId, false, "Artifact handler not available", DateTimeOffset.UtcNow));
}
});
// Pattern 4: Integration Routing — forward to integration handler
Receive<IntegrationCallRequest>(msg =>
{
if (_integrationHandler != null)
_integrationHandler.Forward(msg);
else
{
Sender.Tell(new IntegrationCallResponse(
msg.CorrelationId, _siteId, false, null, "Integration handler not available", DateTimeOffset.UtcNow));
}
});
// Pattern 5: Debug View — forward to Deployment Manager (which routes to Instance Actor)
Receive<SubscribeDebugViewRequest>(msg => _deploymentManagerProxy.Forward(msg));
Receive<UnsubscribeDebugViewRequest>(msg => _deploymentManagerProxy.Forward(msg));
// Pattern 7: Remote Queries
Receive<EventLogQueryRequest>(msg =>
{
if (_eventLogHandler != null)
_eventLogHandler.Forward(msg);
else
{
Sender.Tell(new EventLogQueryResponse(
msg.CorrelationId, _siteId, [], null, false, false,
"Event log handler not available", DateTimeOffset.UtcNow));
}
});
Receive<ParkedMessageQueryRequest>(msg =>
{
if (_parkedMessageHandler != null)
_parkedMessageHandler.Forward(msg);
else
{
Sender.Tell(new ParkedMessageQueryResponse(
msg.CorrelationId, _siteId, [], 0, msg.PageNumber, msg.PageSize, false,
"Parked message handler not available", DateTimeOffset.UtcNow));
}
});
// Internal: send heartbeat tick
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());
// Internal: forward health report to central
Receive<SiteHealthReport>(msg =>
{
_centralSelection?.Tell(msg, Self);
});
}
protected override void PreStart()
{
_log.Info("SiteCommunicationActor started for site {0}", _siteId);
// Schedule periodic heartbeat to central
Timers.StartPeriodicTimer(
"heartbeat",
new SendHeartbeat(),
TimeSpan.FromSeconds(1), // initial delay
_options.TransportHeartbeatInterval);
}
private void HandleRegisterCentral(RegisterCentralPath msg)
{
_centralSelection = Context.ActorSelection(msg.CentralActorPath);
_log.Info("Registered central communication path: {0}", msg.CentralActorPath);
}
private void HandleRegisterLocalHandler(RegisterLocalHandler msg)
{
switch (msg.HandlerType)
{
case LocalHandlerType.EventLog:
_eventLogHandler = msg.Handler;
break;
case LocalHandlerType.ParkedMessages:
_parkedMessageHandler = msg.Handler;
break;
case LocalHandlerType.Integration:
_integrationHandler = msg.Handler;
break;
case LocalHandlerType.Artifacts:
_artifactHandler = msg.Handler;
break;
}
_log.Info("Registered local handler for {0}", msg.HandlerType);
}
private void SendHeartbeatToCentral()
{
if (_centralSelection == null)
return;
var hostname = Environment.MachineName;
var heartbeat = new HeartbeatMessage(
_siteId,
hostname,
IsActive: true,
DateTimeOffset.UtcNow);
_centralSelection.Tell(heartbeat, Self);
}
// ── Internal messages ──
internal record SendHeartbeat;
}
/// <summary>
/// Command to register the central communication actor path for outbound messages.
/// </summary>
public record RegisterCentralPath(string CentralActorPath);
/// <summary>
/// Command to register a local actor as a handler for a specific message pattern.
/// </summary>
public record RegisterLocalHandler(LocalHandlerType HandlerType, IActorRef Handler);
public enum LocalHandlerType
{
EventLog,
ParkedMessages,
Integration,
Artifacts
}

View File

@@ -1,10 +1,35 @@
namespace ScadaLink.Communication;
/// <summary>
/// Configuration options for central-site communication, including per-pattern
/// timeouts and transport heartbeat settings.
/// </summary>
public class CommunicationOptions
{
/// <summary>Timeout for deployment commands (typically longest due to apply logic).</summary>
public TimeSpan DeploymentTimeout { get; set; } = TimeSpan.FromMinutes(2);
/// <summary>Timeout for lifecycle commands (disable, enable, delete).</summary>
public TimeSpan LifecycleTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for artifact deployment commands.</summary>
public TimeSpan ArtifactDeploymentTimeout { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>Timeout for remote query requests (event logs, parked messages).</summary>
public TimeSpan QueryTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for integration call routing.</summary>
public TimeSpan IntegrationTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>Timeout for debug view subscribe/unsubscribe handshake.</summary>
public TimeSpan DebugViewTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>Timeout for health report acknowledgement (fire-and-forget, but bounded).</summary>
public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>Akka.Remote transport heartbeat interval.</summary>
public TimeSpan TransportHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>Akka.Remote transport failure detection threshold.</summary>
public TimeSpan TransportFailureThreshold { get; set; } = TimeSpan.FromSeconds(15);
}

View File

@@ -0,0 +1,152 @@
using Akka.Actor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Messages.RemoteQuery;
namespace ScadaLink.Communication;
/// <summary>
/// Central-side service that wraps the Akka Ask pattern with per-pattern timeouts.
/// Provides a typed API for sending messages to sites and awaiting responses.
/// On connection drop, the ask times out (no central buffering per design).
/// </summary>
public class CommunicationService
{
private readonly CommunicationOptions _options;
private readonly ILogger<CommunicationService> _logger;
private IActorRef? _centralCommunicationActor;
public CommunicationService(
IOptions<CommunicationOptions> options,
ILogger<CommunicationService> logger)
{
_options = options.Value;
_logger = logger;
}
/// <summary>
/// Sets the central communication actor reference. Called during actor system startup.
/// </summary>
public void SetCommunicationActor(IActorRef centralCommunicationActor)
{
_centralCommunicationActor = centralCommunicationActor;
}
private IActorRef GetActor()
{
return _centralCommunicationActor
?? throw new InvalidOperationException("CommunicationService not initialized. CentralCommunicationActor not set.");
}
// ── Pattern 1: Instance Deployment ──
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
{
_logger.LogDebug(
"Sending DeployInstanceCommand to site {SiteId}, instance={Instance}, correlationId={DeploymentId}",
siteId, command.InstanceUniqueName, command.DeploymentId);
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
// ── Pattern 2: Lifecycle ──
public async Task<InstanceLifecycleResponse> DisableInstanceAsync(
string siteId, DisableInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
public async Task<InstanceLifecycleResponse> EnableInstanceAsync(
string siteId, EnableInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
public async Task<InstanceLifecycleResponse> DeleteInstanceAsync(
string siteId, DeleteInstanceCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<InstanceLifecycleResponse>(
envelope, _options.LifecycleTimeout, cancellationToken);
}
// ── Pattern 3: Artifact Deployment ──
public async Task<ArtifactDeploymentResponse> DeployArtifactsAsync(
string siteId, DeployArtifactsCommand command, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<ArtifactDeploymentResponse>(
envelope, _options.ArtifactDeploymentTimeout, cancellationToken);
}
// ── Pattern 4: Integration Routing ──
public async Task<IntegrationCallResponse> RouteIntegrationCallAsync(
string siteId, IntegrationCallRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<IntegrationCallResponse>(
envelope, _options.IntegrationTimeout, cancellationToken);
}
// ── Pattern 5: Debug View ──
public async Task<DebugViewSnapshot> SubscribeDebugViewAsync(
string siteId, SubscribeDebugViewRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<DebugViewSnapshot>(
envelope, _options.DebugViewTimeout, cancellationToken);
}
public void UnsubscribeDebugView(string siteId, UnsubscribeDebugViewRequest request)
{
// Tell (fire-and-forget) — no response expected
GetActor().Tell(new SiteEnvelope(siteId, request));
}
// ── Pattern 6: Health Reporting (site→central, Tell) ──
// Health reports are received by central, not sent. No method needed here.
// ── Pattern 7: Remote Queries ──
public async Task<EventLogQueryResponse> QueryEventLogsAsync(
string siteId, EventLogQueryRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<EventLogQueryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
public async Task<ParkedMessageQueryResponse> QueryParkedMessagesAsync(
string siteId, ParkedMessageQueryRequest request, CancellationToken cancellationToken = default)
{
var envelope = new SiteEnvelope(siteId, request);
return await GetActor().Ask<ParkedMessageQueryResponse>(
envelope, _options.QueryTimeout, cancellationToken);
}
// ── Pattern 8: Heartbeat (site→central, Tell) ──
// Heartbeats are received by central, not sent. No method needed here.
}
/// <summary>
/// Envelope that wraps any message with a target site ID for routing.
/// Used by CentralCommunicationActor to resolve the site actor path.
/// </summary>
public record SiteEnvelope(string SiteId, object Message);

View File

@@ -8,8 +8,14 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.5.62" />
<PackageReference Include="Akka.Remote" Version="1.5.62" />
<PackageReference Include="Akka.Cluster" Version="1.5.62" />
<PackageReference Include="Akka.Cluster.Tools" Version="1.5.62" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.5" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.5" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.5" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="10.0.5" />
</ItemGroup>
<ItemGroup>

View File

@@ -6,13 +6,18 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddCommunication(this IServiceCollection services)
{
// Phase 0: skeleton only
services.AddOptions<CommunicationOptions>()
.BindConfiguration("Communication");
services.AddSingleton<CommunicationService>();
return services;
}
public static IServiceCollection AddCommunicationActors(this IServiceCollection services)
{
// Phase 0: placeholder for Akka actor registration
// Actor registration happens in AkkaHostedService.RegisterCentralActors/RegisterSiteActors.
// This method is a hook for any additional DI registrations needed by the communication actors.
return services;
}
}