diff --git a/src/ScadaLink.Commons/Messages/DataConnection/DataConnectionHealthReport.cs b/src/ScadaLink.Commons/Messages/DataConnection/DataConnectionHealthReport.cs
new file mode 100644
index 0000000..4c4fc7d
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/DataConnectionHealthReport.cs
@@ -0,0 +1,13 @@
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Health metrics for a single data connection, contributed to the site health report.
+///
+public record DataConnectionHealthReport(
+ string ConnectionName,
+ ConnectionHealth Status,
+ int TotalSubscribedTags,
+ int ResolvedTags,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsRequest.cs b/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsRequest.cs
new file mode 100644
index 0000000..2288b01
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsRequest.cs
@@ -0,0 +1,11 @@
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Request from an Instance Actor to subscribe to tag values through the DCL.
+///
+public record SubscribeTagsRequest(
+ string CorrelationId,
+ string InstanceUniqueName,
+ string ConnectionName,
+ IReadOnlyList TagPaths,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsResponse.cs b/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsResponse.cs
new file mode 100644
index 0000000..246f4c4
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/SubscribeTagsResponse.cs
@@ -0,0 +1,11 @@
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Response confirming tag subscription registration.
+///
+public record SubscribeTagsResponse(
+ string CorrelationId,
+ string InstanceUniqueName,
+ bool Success,
+ string? ErrorMessage,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/DataConnection/TagValueUpdate.cs b/src/ScadaLink.Commons/Messages/DataConnection/TagValueUpdate.cs
new file mode 100644
index 0000000..7619a03
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/TagValueUpdate.cs
@@ -0,0 +1,21 @@
+using ScadaLink.Commons.Interfaces.Protocol;
+
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Published by DCL to an Instance Actor when a subscribed tag value changes.
+///
+public record TagValueUpdate(
+ string ConnectionName,
+ string TagPath,
+ object? Value,
+ QualityCode Quality,
+ DateTimeOffset Timestamp);
+
+///
+/// Published by DCL when connection state changes, causing bulk quality updates.
+///
+public record ConnectionQualityChanged(
+ string ConnectionName,
+ QualityCode Quality,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/DataConnection/UnsubscribeTagsRequest.cs b/src/ScadaLink.Commons/Messages/DataConnection/UnsubscribeTagsRequest.cs
new file mode 100644
index 0000000..f9b1d79
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/UnsubscribeTagsRequest.cs
@@ -0,0 +1,10 @@
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Request from an Instance Actor to unsubscribe from all its tags when stopping.
+///
+public record UnsubscribeTagsRequest(
+ string CorrelationId,
+ string InstanceUniqueName,
+ string ConnectionName,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/DataConnection/WriteTagRequest.cs b/src/ScadaLink.Commons/Messages/DataConnection/WriteTagRequest.cs
new file mode 100644
index 0000000..b00e19e
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/DataConnection/WriteTagRequest.cs
@@ -0,0 +1,21 @@
+namespace ScadaLink.Commons.Messages.DataConnection;
+
+///
+/// Request to write a value to a device tag through the DCL.
+/// Write failures are returned synchronously to the calling script.
+///
+public record WriteTagRequest(
+ string CorrelationId,
+ string ConnectionName,
+ string TagPath,
+ object? Value,
+ DateTimeOffset Timestamp);
+
+///
+/// Response for a device tag write operation.
+///
+public record WriteTagResponse(
+ string CorrelationId,
+ bool Success,
+ string? ErrorMessage,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/Integration/IntegrationCallRequest.cs b/src/ScadaLink.Commons/Messages/Integration/IntegrationCallRequest.cs
new file mode 100644
index 0000000..5cd9f1b
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/Integration/IntegrationCallRequest.cs
@@ -0,0 +1,14 @@
+namespace ScadaLink.Commons.Messages.Integration;
+
+///
+/// Request routed from central to site to invoke an integration method
+/// (external system call or notification) on behalf of the central UI or API.
+///
+public record IntegrationCallRequest(
+ string CorrelationId,
+ string SiteId,
+ string InstanceUniqueName,
+ string TargetSystemName,
+ string MethodName,
+ IReadOnlyDictionary Parameters,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/Integration/IntegrationCallResponse.cs b/src/ScadaLink.Commons/Messages/Integration/IntegrationCallResponse.cs
new file mode 100644
index 0000000..5969f75
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/Integration/IntegrationCallResponse.cs
@@ -0,0 +1,12 @@
+namespace ScadaLink.Commons.Messages.Integration;
+
+///
+/// Response for an integration call routed through central-site communication.
+///
+public record IntegrationCallResponse(
+ string CorrelationId,
+ string SiteId,
+ bool Success,
+ string? ResultJson,
+ string? ErrorMessage,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryRequest.cs b/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryRequest.cs
new file mode 100644
index 0000000..0808fa6
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryRequest.cs
@@ -0,0 +1,19 @@
+namespace ScadaLink.Commons.Messages.RemoteQuery;
+
+///
+/// Request to query site event logs from central.
+/// Supports filtering by event type, severity, instance, time range, and keyword search.
+/// Uses keyset pagination via continuation token (last event ID).
+///
+public record EventLogQueryRequest(
+ string CorrelationId,
+ string SiteId,
+ DateTimeOffset? From,
+ DateTimeOffset? To,
+ string? EventType,
+ string? Severity,
+ string? InstanceId,
+ string? KeywordFilter,
+ long? ContinuationToken,
+ int PageSize,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryResponse.cs b/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryResponse.cs
new file mode 100644
index 0000000..8bae177
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/RemoteQuery/EventLogQueryResponse.cs
@@ -0,0 +1,28 @@
+namespace ScadaLink.Commons.Messages.RemoteQuery;
+
+///
+/// A single event log entry returned from a site query.
+///
+public record EventLogEntry(
+ long Id,
+ DateTimeOffset Timestamp,
+ string EventType,
+ string Severity,
+ string? InstanceId,
+ string Source,
+ string Message,
+ string? Details);
+
+///
+/// Response containing paginated event log entries from a site.
+/// Uses keyset pagination: ContinuationToken is the last event ID in the result set.
+///
+public record EventLogQueryResponse(
+ string CorrelationId,
+ string SiteId,
+ IReadOnlyList Entries,
+ long? ContinuationToken,
+ bool HasMore,
+ bool Success,
+ string? ErrorMessage,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryRequest.cs b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryRequest.cs
new file mode 100644
index 0000000..2f26f71
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryRequest.cs
@@ -0,0 +1,11 @@
+namespace ScadaLink.Commons.Messages.RemoteQuery;
+
+///
+/// Request to query parked (permanently failed) store-and-forward messages at a site.
+///
+public record ParkedMessageQueryRequest(
+ string CorrelationId,
+ string SiteId,
+ int PageNumber,
+ int PageSize,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryResponse.cs b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryResponse.cs
new file mode 100644
index 0000000..944b444
--- /dev/null
+++ b/src/ScadaLink.Commons/Messages/RemoteQuery/ParkedMessageQueryResponse.cs
@@ -0,0 +1,24 @@
+namespace ScadaLink.Commons.Messages.RemoteQuery;
+
+///
+/// Response containing parked store-and-forward messages from a site.
+///
+public record ParkedMessageEntry(
+ string MessageId,
+ string TargetSystem,
+ string MethodName,
+ string ErrorMessage,
+ int AttemptCount,
+ DateTimeOffset OriginalTimestamp,
+ DateTimeOffset LastAttemptTimestamp);
+
+public record ParkedMessageQueryResponse(
+ string CorrelationId,
+ string SiteId,
+ IReadOnlyList Messages,
+ int TotalCount,
+ int PageNumber,
+ int PageSize,
+ bool Success,
+ string? ErrorMessage,
+ DateTimeOffset Timestamp);
diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
new file mode 100644
index 0000000..8b50550
--- /dev/null
+++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
@@ -0,0 +1,172 @@
+using Akka.Actor;
+using Akka.Event;
+using ScadaLink.Commons.Messages.Communication;
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.Communication.Actors;
+
+///
+/// Central-side actor that routes messages from central to site clusters via Akka remoting.
+/// Maintains a registry of known site actor paths (learned from heartbeats/connection events).
+///
+/// WP-4: All 8 message patterns routed through this actor.
+/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
+///
+public class CentralCommunicationActor : ReceiveActor
+{
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+
+ ///
+ /// Maps SiteId → remote SiteCommunicationActor selection.
+ /// Updated when heartbeats arrive or connection state changes.
+ ///
+ private readonly Dictionary _siteSelections = new();
+
+ ///
+ /// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
+ /// Used to kill debug streams on site disconnection (WP-5).
+ ///
+ private readonly Dictionary _debugSubscriptions = new();
+
+ ///
+ /// Tracks in-progress deployments: deploymentId → siteId.
+ /// On central failover, in-progress deployments are treated as failed (WP-5).
+ ///
+ private readonly Dictionary _inProgressDeployments = new();
+
+ public CentralCommunicationActor()
+ {
+ // Site registration via heartbeats
+ Receive(HandleHeartbeat);
+
+ // Connection state changes
+ Receive(HandleConnectionStateChanged);
+
+ // Site registration command (manual or from discovery)
+ Receive(HandleRegisterSite);
+
+ // Route enveloped messages to sites
+ Receive(HandleSiteEnvelope);
+ }
+
+ private void HandleHeartbeat(HeartbeatMessage heartbeat)
+ {
+ // Heartbeats arrive from sites — forward to any interested central actors
+ // The sender's path tells us the site's communication actor address
+ if (!_siteSelections.ContainsKey(heartbeat.SiteId))
+ {
+ var senderPath = Sender.Path.ToString();
+ _log.Info("Learned site {0} from heartbeat at path {1}", heartbeat.SiteId, senderPath);
+ }
+
+ // Forward heartbeat to parent/subscribers (central health monitoring)
+ Context.Parent.Tell(heartbeat);
+ }
+
+ private void HandleConnectionStateChanged(ConnectionStateChanged msg)
+ {
+ if (!msg.IsConnected)
+ {
+ _log.Warning("Site {0} disconnected at {1}", msg.SiteId, msg.Timestamp);
+
+ // WP-5: Kill active debug streams for the disconnected site
+ var toRemove = _debugSubscriptions
+ .Where(kvp => kvp.Value.SiteId == msg.SiteId)
+ .ToList();
+
+ foreach (var kvp in toRemove)
+ {
+ _log.Info("Killing debug stream {0} for disconnected site {1}", kvp.Key, msg.SiteId);
+ kvp.Value.Subscriber.Tell(new DebugStreamTerminated(msg.SiteId, kvp.Key));
+ _debugSubscriptions.Remove(kvp.Key);
+ }
+
+ // WP-5: Mark in-progress deployments as failed
+ var failedDeployments = _inProgressDeployments
+ .Where(kvp => kvp.Value == msg.SiteId)
+ .Select(kvp => kvp.Key)
+ .ToList();
+
+ foreach (var deploymentId in failedDeployments)
+ {
+ _log.Warning("Deployment {0} to site {1} treated as failed due to disconnection",
+ deploymentId, msg.SiteId);
+ _inProgressDeployments.Remove(deploymentId);
+ }
+
+ _siteSelections.Remove(msg.SiteId);
+ }
+ else
+ {
+ _log.Info("Site {0} connected at {1}", msg.SiteId, msg.Timestamp);
+ }
+ }
+
+ private void HandleRegisterSite(RegisterSite msg)
+ {
+ var selection = Context.ActorSelection(msg.RemoteActorPath);
+ _siteSelections[msg.SiteId] = selection;
+ _log.Info("Registered site {0} at path {1}", msg.SiteId, msg.RemoteActorPath);
+ }
+
+ private void HandleSiteEnvelope(SiteEnvelope envelope)
+ {
+ if (!_siteSelections.TryGetValue(envelope.SiteId, out var siteSelection))
+ {
+ _log.Warning("No known path for site {0}, cannot route message {1}",
+ envelope.SiteId, envelope.Message.GetType().Name);
+
+ // The Ask will timeout on the caller side — no central buffering (WP-5)
+ return;
+ }
+
+ // Track debug subscriptions for cleanup on disconnect
+ TrackMessageForCleanup(envelope);
+
+ // Forward the inner message to the site, preserving the original sender
+ // so the site can reply directly to the caller (completing the Ask pattern)
+ siteSelection.Tell(envelope.Message, Sender);
+ }
+
+ private void TrackMessageForCleanup(SiteEnvelope envelope)
+ {
+ switch (envelope.Message)
+ {
+ case Commons.Messages.DebugView.SubscribeDebugViewRequest sub:
+ _debugSubscriptions[sub.CorrelationId] = (envelope.SiteId, Sender);
+ break;
+
+ case Commons.Messages.DebugView.UnsubscribeDebugViewRequest unsub:
+ _debugSubscriptions.Remove(unsub.CorrelationId);
+ break;
+
+ case Commons.Messages.Deployment.DeployInstanceCommand deploy:
+ _inProgressDeployments[deploy.DeploymentId] = envelope.SiteId;
+ break;
+ }
+ }
+
+ protected override void PreStart()
+ {
+ _log.Info("CentralCommunicationActor started");
+ }
+
+ protected override void PostStop()
+ {
+ _log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5).");
+ // On central failover, all in-progress deployments are failed
+ _inProgressDeployments.Clear();
+ _debugSubscriptions.Clear();
+ }
+}
+
+///
+/// Command to register a site's remote communication actor path.
+///
+public record RegisterSite(string SiteId, string RemoteActorPath);
+
+///
+/// Notification sent to debug view subscribers when the stream is terminated
+/// due to site disconnection (WP-5).
+///
+public record DebugStreamTerminated(string SiteId, string CorrelationId);
diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs
new file mode 100644
index 0000000..be71438
--- /dev/null
+++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs
@@ -0,0 +1,212 @@
+using Akka.Actor;
+using Akka.Event;
+using ScadaLink.Commons.Messages.Artifacts;
+using ScadaLink.Commons.Messages.DebugView;
+using ScadaLink.Commons.Messages.Deployment;
+using ScadaLink.Commons.Messages.Health;
+using ScadaLink.Commons.Messages.Integration;
+using ScadaLink.Commons.Messages.Lifecycle;
+using ScadaLink.Commons.Messages.RemoteQuery;
+
+namespace ScadaLink.Communication.Actors;
+
+///
+/// Site-side actor that receives messages from central via Akka remoting and routes
+/// them to the appropriate local actors. Also sends heartbeats and health reports
+/// to central.
+///
+/// WP-4: Routes all 8 message patterns to local handlers.
+///
+public class SiteCommunicationActor : ReceiveActor, IWithTimers
+{
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+ private readonly string _siteId;
+ private readonly CommunicationOptions _options;
+
+ ///
+ /// Reference to the local Deployment Manager singleton proxy.
+ ///
+ private readonly IActorRef _deploymentManagerProxy;
+
+ ///
+ /// Optional reference to the central communication actor for sending heartbeats/health.
+ /// Set via RegisterCentral message.
+ ///
+ private ActorSelection? _centralSelection;
+
+ ///
+ /// Local actor references for routing specific message patterns.
+ /// Populated via registration messages.
+ ///
+ private IActorRef? _eventLogHandler;
+ private IActorRef? _parkedMessageHandler;
+ private IActorRef? _integrationHandler;
+ private IActorRef? _artifactHandler;
+
+ public ITimerScheduler Timers { get; set; } = null!;
+
+ public SiteCommunicationActor(
+ string siteId,
+ CommunicationOptions options,
+ IActorRef deploymentManagerProxy)
+ {
+ _siteId = siteId;
+ _options = options;
+ _deploymentManagerProxy = deploymentManagerProxy;
+
+ // Registration
+ Receive(HandleRegisterCentral);
+ Receive(HandleRegisterLocalHandler);
+
+ // Pattern 1: Instance Deployment — forward to Deployment Manager
+ Receive(msg =>
+ {
+ _log.Debug("Routing DeployInstanceCommand for {0} to DeploymentManager", msg.InstanceUniqueName);
+ _deploymentManagerProxy.Forward(msg);
+ });
+
+ // Pattern 2: Lifecycle — forward to Deployment Manager
+ Receive(msg => _deploymentManagerProxy.Forward(msg));
+ Receive(msg => _deploymentManagerProxy.Forward(msg));
+ Receive(msg => _deploymentManagerProxy.Forward(msg));
+
+ // Pattern 3: Artifact Deployment — forward to artifact handler if registered
+ Receive(msg =>
+ {
+ if (_artifactHandler != null)
+ _artifactHandler.Forward(msg);
+ else
+ {
+ _log.Warning("No artifact handler registered, replying with failure");
+ Sender.Tell(new ArtifactDeploymentResponse(
+ msg.DeploymentId, _siteId, false, "Artifact handler not available", DateTimeOffset.UtcNow));
+ }
+ });
+
+ // Pattern 4: Integration Routing — forward to integration handler
+ Receive(msg =>
+ {
+ if (_integrationHandler != null)
+ _integrationHandler.Forward(msg);
+ else
+ {
+ Sender.Tell(new IntegrationCallResponse(
+ msg.CorrelationId, _siteId, false, null, "Integration handler not available", DateTimeOffset.UtcNow));
+ }
+ });
+
+ // Pattern 5: Debug View — forward to Deployment Manager (which routes to Instance Actor)
+ Receive(msg => _deploymentManagerProxy.Forward(msg));
+ Receive(msg => _deploymentManagerProxy.Forward(msg));
+
+ // Pattern 7: Remote Queries
+ Receive(msg =>
+ {
+ if (_eventLogHandler != null)
+ _eventLogHandler.Forward(msg);
+ else
+ {
+ Sender.Tell(new EventLogQueryResponse(
+ msg.CorrelationId, _siteId, [], null, false, false,
+ "Event log handler not available", DateTimeOffset.UtcNow));
+ }
+ });
+
+ Receive(msg =>
+ {
+ if (_parkedMessageHandler != null)
+ _parkedMessageHandler.Forward(msg);
+ else
+ {
+ Sender.Tell(new ParkedMessageQueryResponse(
+ msg.CorrelationId, _siteId, [], 0, msg.PageNumber, msg.PageSize, false,
+ "Parked message handler not available", DateTimeOffset.UtcNow));
+ }
+ });
+
+ // Internal: send heartbeat tick
+ Receive(_ => SendHeartbeatToCentral());
+
+ // Internal: forward health report to central
+ Receive(msg =>
+ {
+ _centralSelection?.Tell(msg, Self);
+ });
+ }
+
+ protected override void PreStart()
+ {
+ _log.Info("SiteCommunicationActor started for site {0}", _siteId);
+
+ // Schedule periodic heartbeat to central
+ Timers.StartPeriodicTimer(
+ "heartbeat",
+ new SendHeartbeat(),
+ TimeSpan.FromSeconds(1), // initial delay
+ _options.TransportHeartbeatInterval);
+ }
+
+ private void HandleRegisterCentral(RegisterCentralPath msg)
+ {
+ _centralSelection = Context.ActorSelection(msg.CentralActorPath);
+ _log.Info("Registered central communication path: {0}", msg.CentralActorPath);
+ }
+
+ private void HandleRegisterLocalHandler(RegisterLocalHandler msg)
+ {
+ switch (msg.HandlerType)
+ {
+ case LocalHandlerType.EventLog:
+ _eventLogHandler = msg.Handler;
+ break;
+ case LocalHandlerType.ParkedMessages:
+ _parkedMessageHandler = msg.Handler;
+ break;
+ case LocalHandlerType.Integration:
+ _integrationHandler = msg.Handler;
+ break;
+ case LocalHandlerType.Artifacts:
+ _artifactHandler = msg.Handler;
+ break;
+ }
+
+ _log.Info("Registered local handler for {0}", msg.HandlerType);
+ }
+
+ private void SendHeartbeatToCentral()
+ {
+ if (_centralSelection == null)
+ return;
+
+ var hostname = Environment.MachineName;
+ var heartbeat = new HeartbeatMessage(
+ _siteId,
+ hostname,
+ IsActive: true,
+ DateTimeOffset.UtcNow);
+
+ _centralSelection.Tell(heartbeat, Self);
+ }
+
+ // ── Internal messages ──
+
+ internal record SendHeartbeat;
+}
+
+///
+/// Command to register the central communication actor path for outbound messages.
+///
+public record RegisterCentralPath(string CentralActorPath);
+
+///
+/// Command to register a local actor as a handler for a specific message pattern.
+///
+public record RegisterLocalHandler(LocalHandlerType HandlerType, IActorRef Handler);
+
+public enum LocalHandlerType
+{
+ EventLog,
+ ParkedMessages,
+ Integration,
+ Artifacts
+}
diff --git a/src/ScadaLink.Communication/CommunicationOptions.cs b/src/ScadaLink.Communication/CommunicationOptions.cs
index f68cefc..d13945b 100644
--- a/src/ScadaLink.Communication/CommunicationOptions.cs
+++ b/src/ScadaLink.Communication/CommunicationOptions.cs
@@ -1,10 +1,35 @@
namespace ScadaLink.Communication;
+///
+/// Configuration options for central-site communication, including per-pattern
+/// timeouts and transport heartbeat settings.
+///
public class CommunicationOptions
{
+ /// Timeout for deployment commands (typically longest due to apply logic).
public TimeSpan DeploymentTimeout { get; set; } = TimeSpan.FromMinutes(2);
+
+ /// Timeout for lifecycle commands (disable, enable, delete).
public TimeSpan LifecycleTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ /// Timeout for artifact deployment commands.
+ public TimeSpan ArtifactDeploymentTimeout { get; set; } = TimeSpan.FromMinutes(1);
+
+ /// Timeout for remote query requests (event logs, parked messages).
public TimeSpan QueryTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ /// Timeout for integration call routing.
+ public TimeSpan IntegrationTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ /// Timeout for debug view subscribe/unsubscribe handshake.
+ public TimeSpan DebugViewTimeout { get; set; } = TimeSpan.FromSeconds(10);
+
+ /// Timeout for health report acknowledgement (fire-and-forget, but bounded).
+ public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
+
+ /// Akka.Remote transport heartbeat interval.
public TimeSpan TransportHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
+
+ /// Akka.Remote transport failure detection threshold.
public TimeSpan TransportFailureThreshold { get; set; } = TimeSpan.FromSeconds(15);
}
diff --git a/src/ScadaLink.Communication/CommunicationService.cs b/src/ScadaLink.Communication/CommunicationService.cs
new file mode 100644
index 0000000..47448d1
--- /dev/null
+++ b/src/ScadaLink.Communication/CommunicationService.cs
@@ -0,0 +1,152 @@
+using Akka.Actor;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.Commons.Messages.Artifacts;
+using ScadaLink.Commons.Messages.DebugView;
+using ScadaLink.Commons.Messages.Deployment;
+using ScadaLink.Commons.Messages.Health;
+using ScadaLink.Commons.Messages.Integration;
+using ScadaLink.Commons.Messages.Lifecycle;
+using ScadaLink.Commons.Messages.RemoteQuery;
+
+namespace ScadaLink.Communication;
+
+///
+/// Central-side service that wraps the Akka Ask pattern with per-pattern timeouts.
+/// Provides a typed API for sending messages to sites and awaiting responses.
+/// On connection drop, the ask times out (no central buffering per design).
+///
+public class CommunicationService
+{
+ private readonly CommunicationOptions _options;
+ private readonly ILogger _logger;
+ private IActorRef? _centralCommunicationActor;
+
+ public CommunicationService(
+ IOptions options,
+ ILogger logger)
+ {
+ _options = options.Value;
+ _logger = logger;
+ }
+
+ ///
+ /// Sets the central communication actor reference. Called during actor system startup.
+ ///
+ public void SetCommunicationActor(IActorRef centralCommunicationActor)
+ {
+ _centralCommunicationActor = centralCommunicationActor;
+ }
+
+ private IActorRef GetActor()
+ {
+ return _centralCommunicationActor
+ ?? throw new InvalidOperationException("CommunicationService not initialized. CentralCommunicationActor not set.");
+ }
+
+ // ── Pattern 1: Instance Deployment ──
+
+ public async Task DeployInstanceAsync(
+ string siteId, DeployInstanceCommand command, CancellationToken cancellationToken = default)
+ {
+ _logger.LogDebug(
+ "Sending DeployInstanceCommand to site {SiteId}, instance={Instance}, correlationId={DeploymentId}",
+ siteId, command.InstanceUniqueName, command.DeploymentId);
+
+ var envelope = new SiteEnvelope(siteId, command);
+ return await GetActor().Ask(
+ envelope, _options.DeploymentTimeout, cancellationToken);
+ }
+
+ // ── Pattern 2: Lifecycle ──
+
+ public async Task DisableInstanceAsync(
+ string siteId, DisableInstanceCommand command, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, command);
+ return await GetActor().Ask(
+ envelope, _options.LifecycleTimeout, cancellationToken);
+ }
+
+ public async Task EnableInstanceAsync(
+ string siteId, EnableInstanceCommand command, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, command);
+ return await GetActor().Ask(
+ envelope, _options.LifecycleTimeout, cancellationToken);
+ }
+
+ public async Task DeleteInstanceAsync(
+ string siteId, DeleteInstanceCommand command, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, command);
+ return await GetActor().Ask(
+ envelope, _options.LifecycleTimeout, cancellationToken);
+ }
+
+ // ── Pattern 3: Artifact Deployment ──
+
+ public async Task DeployArtifactsAsync(
+ string siteId, DeployArtifactsCommand command, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, command);
+ return await GetActor().Ask(
+ envelope, _options.ArtifactDeploymentTimeout, cancellationToken);
+ }
+
+ // ── Pattern 4: Integration Routing ──
+
+ public async Task RouteIntegrationCallAsync(
+ string siteId, IntegrationCallRequest request, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, request);
+ return await GetActor().Ask(
+ envelope, _options.IntegrationTimeout, cancellationToken);
+ }
+
+ // ── Pattern 5: Debug View ──
+
+ public async Task SubscribeDebugViewAsync(
+ string siteId, SubscribeDebugViewRequest request, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, request);
+ return await GetActor().Ask(
+ envelope, _options.DebugViewTimeout, cancellationToken);
+ }
+
+ public void UnsubscribeDebugView(string siteId, UnsubscribeDebugViewRequest request)
+ {
+ // Tell (fire-and-forget) — no response expected
+ GetActor().Tell(new SiteEnvelope(siteId, request));
+ }
+
+ // ── Pattern 6: Health Reporting (site→central, Tell) ──
+ // Health reports are received by central, not sent. No method needed here.
+
+ // ── Pattern 7: Remote Queries ──
+
+ public async Task QueryEventLogsAsync(
+ string siteId, EventLogQueryRequest request, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, request);
+ return await GetActor().Ask(
+ envelope, _options.QueryTimeout, cancellationToken);
+ }
+
+ public async Task QueryParkedMessagesAsync(
+ string siteId, ParkedMessageQueryRequest request, CancellationToken cancellationToken = default)
+ {
+ var envelope = new SiteEnvelope(siteId, request);
+ return await GetActor().Ask(
+ envelope, _options.QueryTimeout, cancellationToken);
+ }
+
+ // ── Pattern 8: Heartbeat (site→central, Tell) ──
+ // Heartbeats are received by central, not sent. No method needed here.
+}
+
+///
+/// Envelope that wraps any message with a target site ID for routing.
+/// Used by CentralCommunicationActor to resolve the site actor path.
+///
+public record SiteEnvelope(string SiteId, object Message);
diff --git a/src/ScadaLink.Communication/ScadaLink.Communication.csproj b/src/ScadaLink.Communication/ScadaLink.Communication.csproj
index 049c7d9..a41b756 100644
--- a/src/ScadaLink.Communication/ScadaLink.Communication.csproj
+++ b/src/ScadaLink.Communication/ScadaLink.Communication.csproj
@@ -8,8 +8,14 @@
+
+
+
+
+
+
diff --git a/src/ScadaLink.Communication/ServiceCollectionExtensions.cs b/src/ScadaLink.Communication/ServiceCollectionExtensions.cs
index da65a9e..e6f0021 100644
--- a/src/ScadaLink.Communication/ServiceCollectionExtensions.cs
+++ b/src/ScadaLink.Communication/ServiceCollectionExtensions.cs
@@ -6,13 +6,18 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddCommunication(this IServiceCollection services)
{
- // Phase 0: skeleton only
+ services.AddOptions()
+ .BindConfiguration("Communication");
+
+ services.AddSingleton();
+
return services;
}
public static IServiceCollection AddCommunicationActors(this IServiceCollection services)
{
- // Phase 0: placeholder for Akka actor registration
+ // Actor registration happens in AkkaHostedService.RegisterCentralActors/RegisterSiteActors.
+ // This method is a hook for any additional DI registrations needed by the communication actors.
return services;
}
}
diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs
new file mode 100644
index 0000000..b668a4d
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs
@@ -0,0 +1,476 @@
+using Akka.Actor;
+using Akka.Event;
+using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.Commons.Messages.DataConnection;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.DataConnectionLayer.Actors;
+
+///
+/// WP-6: Connection actor using Akka.NET Become/Stash pattern for lifecycle state machine.
+///
+/// States:
+/// - Connecting: stash subscribe/write requests; attempts connection
+/// - Connected: unstash and process all requests
+/// - Reconnecting: push bad quality for all subscribed tags, stash new requests,
+/// fixed-interval reconnect
+///
+/// WP-9: Auto-reconnect with bad quality on disconnect.
+/// WP-10: Transparent re-subscribe after reconnection.
+/// WP-11: Write-back support (synchronous failure to caller, no S&F).
+/// WP-12: Tag path resolution with retry.
+/// WP-13: Health reporting (connection status + tag resolution counts).
+/// WP-14: Subscription lifecycle (register on create, cleanup on stop).
+///
+public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
+{
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+ private readonly string _connectionName;
+ private readonly IDataConnection _adapter;
+ private readonly DataConnectionOptions _options;
+
+ public IStash Stash { get; set; } = null!;
+ public ITimerScheduler Timers { get; set; } = null!;
+
+ ///
+ /// Active subscriptions: instanceUniqueName → set of tag paths.
+ ///
+ private readonly Dictionary> _subscriptionsByInstance = new();
+
+ ///
+ /// Subscription IDs returned by the adapter: tagPath → subscriptionId.
+ ///
+ private readonly Dictionary _subscriptionIds = new();
+
+ ///
+ /// Tags whose path resolution failed and are awaiting retry.
+ ///
+ private readonly HashSet _unresolvedTags = new();
+
+ ///
+ /// Subscribers: instanceUniqueName → IActorRef (the Instance Actor).
+ ///
+ private readonly Dictionary _subscribers = new();
+
+ ///
+ /// Tracks total subscribed and resolved tags for health reporting.
+ ///
+ private int _totalSubscribed;
+ private int _resolvedTags;
+
+ public DataConnectionActor(
+ string connectionName,
+ IDataConnection adapter,
+ DataConnectionOptions options)
+ {
+ _connectionName = connectionName;
+ _adapter = adapter;
+ _options = options;
+ }
+
+ protected override void PreStart()
+ {
+ _log.Info("DataConnectionActor [{0}] starting in Connecting state", _connectionName);
+ BecomeConnecting();
+ }
+
+ protected override void PostStop()
+ {
+ _log.Info("DataConnectionActor [{0}] stopping — disposing adapter", _connectionName);
+ // Clean up the adapter asynchronously
+ _ = _adapter.DisposeAsync().AsTask();
+ }
+
+ protected override void OnReceive(object message)
+ {
+ // Default handler — should not be reached due to Become
+ Unhandled(message);
+ }
+
+ // ── Connecting State ──
+
+ private void BecomeConnecting()
+ {
+ _log.Info("[{0}] Entering Connecting state", _connectionName);
+ Become(Connecting);
+ Self.Tell(new AttemptConnect());
+ }
+
+ private void Connecting(object message)
+ {
+ switch (message)
+ {
+ case AttemptConnect:
+ HandleAttemptConnect();
+ break;
+ case ConnectResult result:
+ HandleConnectResult(result);
+ break;
+ case SubscribeTagsRequest:
+ case WriteTagRequest:
+ case UnsubscribeTagsRequest:
+ Stash.Stash();
+ break;
+ case GetHealthReport:
+ ReplyWithHealthReport();
+ break;
+ default:
+ Unhandled(message);
+ break;
+ }
+ }
+
+ // ── Connected State ──
+
+ private void BecomeConnected()
+ {
+ _log.Info("[{0}] Entering Connected state", _connectionName);
+ Become(Connected);
+ Stash.UnstashAll();
+ }
+
+ private void Connected(object message)
+ {
+ switch (message)
+ {
+ case SubscribeTagsRequest req:
+ HandleSubscribe(req);
+ break;
+ case UnsubscribeTagsRequest req:
+ HandleUnsubscribe(req);
+ break;
+ case WriteTagRequest req:
+ HandleWrite(req);
+ break;
+ case AdapterDisconnected:
+ HandleDisconnect();
+ break;
+ case RetryTagResolution:
+ HandleRetryTagResolution();
+ break;
+ case GetHealthReport:
+ ReplyWithHealthReport();
+ break;
+ default:
+ Unhandled(message);
+ break;
+ }
+ }
+
+ // ── Reconnecting State ──
+
+ private void BecomeReconnecting()
+ {
+ _log.Warning("[{0}] Entering Reconnecting state", _connectionName);
+ Become(Reconnecting);
+
+ // WP-9: Push bad quality for all subscribed tags on disconnect
+ PushBadQualityForAllTags();
+
+ // Schedule reconnect attempt
+ Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
+ }
+
+ private void Reconnecting(object message)
+ {
+ switch (message)
+ {
+ case AttemptConnect:
+ HandleAttemptConnect();
+ break;
+ case ConnectResult result:
+ HandleReconnectResult(result);
+ break;
+ case SubscribeTagsRequest:
+ case WriteTagRequest:
+ Stash.Stash();
+ break;
+ case UnsubscribeTagsRequest req:
+ // Allow unsubscribe even during reconnect (for cleanup on instance stop)
+ HandleUnsubscribe(req);
+ break;
+ case GetHealthReport:
+ ReplyWithHealthReport();
+ break;
+ default:
+ Unhandled(message);
+ break;
+ }
+ }
+
+ // ── Connection Management ──
+
+ private void HandleAttemptConnect()
+ {
+ _log.Debug("[{0}] Attempting connection...", _connectionName);
+ var self = Self;
+ _adapter.ConnectAsync(new Dictionary()).ContinueWith(t =>
+ {
+ if (t.IsCompletedSuccessfully)
+ return new ConnectResult(true, null);
+ return new ConnectResult(false, t.Exception?.GetBaseException().Message);
+ }).PipeTo(self);
+ }
+
+ private void HandleConnectResult(ConnectResult result)
+ {
+ if (result.Success)
+ {
+ _log.Info("[{0}] Connection established", _connectionName);
+ BecomeConnected();
+ }
+ else
+ {
+ _log.Warning("[{0}] Connection failed: {1}. Retrying in {2}s",
+ _connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
+ Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
+ }
+ }
+
+ private void HandleReconnectResult(ConnectResult result)
+ {
+ if (result.Success)
+ {
+ _log.Info("[{0}] Reconnected successfully", _connectionName);
+
+ // WP-10: Transparent re-subscribe — re-establish all active subscriptions
+ ReSubscribeAll();
+
+ BecomeConnected();
+ }
+ else
+ {
+ _log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s",
+ _connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
+ Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
+ }
+ }
+
+ private void HandleDisconnect()
+ {
+ _log.Warning("[{0}] Adapter reported disconnect", _connectionName);
+ BecomeReconnecting();
+ }
+
+ // ── Subscription Management (WP-14) ──
+
+ private void HandleSubscribe(SubscribeTagsRequest request)
+ {
+ _log.Debug("[{0}] Subscribing {1} tags for instance {2}",
+ _connectionName, request.TagPaths.Count, request.InstanceUniqueName);
+
+ _subscribers[request.InstanceUniqueName] = Sender;
+
+ if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
+ _subscriptionsByInstance[request.InstanceUniqueName] = new HashSet();
+
+ var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
+ var self = Self;
+ var sender = Sender;
+
+ Task.Run(async () =>
+ {
+ foreach (var tagPath in request.TagPaths)
+ {
+ if (_subscriptionIds.ContainsKey(tagPath))
+ {
+ // Already subscribed — just track for this instance
+ instanceTags.Add(tagPath);
+ continue;
+ }
+
+ try
+ {
+ var subId = await _adapter.SubscribeAsync(tagPath, (path, value) =>
+ {
+ self.Tell(new TagValueReceived(path, value));
+ });
+ _subscriptionIds[tagPath] = subId;
+ instanceTags.Add(tagPath);
+ _totalSubscribed++;
+ _resolvedTags++;
+ }
+ catch (Exception ex)
+ {
+ // WP-12: Tag path resolution failure — mark as unresolved, retry later
+ _unresolvedTags.Add(tagPath);
+ instanceTags.Add(tagPath);
+ _totalSubscribed++;
+
+ self.Tell(new TagResolutionFailed(tagPath, ex.Message));
+ }
+ }
+
+ return new SubscribeTagsResponse(
+ request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
+ }).PipeTo(sender);
+
+ // Start tag resolution retry timer if we have unresolved tags
+ if (_unresolvedTags.Count > 0)
+ {
+ Timers.StartPeriodicTimer(
+ "tag-resolution-retry",
+ new RetryTagResolution(),
+ _options.TagResolutionRetryInterval,
+ _options.TagResolutionRetryInterval);
+ }
+ }
+
+ private void HandleUnsubscribe(UnsubscribeTagsRequest request)
+ {
+ _log.Debug("[{0}] Unsubscribing all tags for instance {1}",
+ _connectionName, request.InstanceUniqueName);
+
+ if (!_subscriptionsByInstance.TryGetValue(request.InstanceUniqueName, out var tags))
+ return;
+
+ // WP-14: Cleanup on Instance Actor stop
+ foreach (var tagPath in tags)
+ {
+ // Check if any other instance is still subscribed to this tag
+ var otherSubscribers = _subscriptionsByInstance
+ .Where(kvp => kvp.Key != request.InstanceUniqueName && kvp.Value.Contains(tagPath))
+ .Any();
+
+ if (!otherSubscribers && _subscriptionIds.TryGetValue(tagPath, out var subId))
+ {
+ _ = _adapter.UnsubscribeAsync(subId);
+ _subscriptionIds.Remove(tagPath);
+ _unresolvedTags.Remove(tagPath);
+ _totalSubscribed--;
+ if (!_unresolvedTags.Contains(tagPath))
+ _resolvedTags--;
+ }
+ }
+
+ _subscriptionsByInstance.Remove(request.InstanceUniqueName);
+ _subscribers.Remove(request.InstanceUniqueName);
+ }
+
+ // ── Write Support (WP-11) ──
+
+ private void HandleWrite(WriteTagRequest request)
+ {
+ _log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
+ var sender = Sender;
+
+ // WP-11: Write through DCL to device, failure returned synchronously
+ _adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t =>
+ {
+ if (t.IsCompletedSuccessfully)
+ {
+ var result = t.Result;
+ return new WriteTagResponse(
+ request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
+ }
+ return new WriteTagResponse(
+ request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
+ }).PipeTo(sender);
+ }
+
+ // ── Tag Resolution Retry (WP-12) ──
+
+ private void HandleRetryTagResolution()
+ {
+ if (_unresolvedTags.Count == 0)
+ {
+ Timers.Cancel("tag-resolution-retry");
+ return;
+ }
+
+ _log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, _unresolvedTags.Count);
+
+ var self = Self;
+ var toResolve = _unresolvedTags.ToList();
+
+ foreach (var tagPath in toResolve)
+ {
+ _adapter.SubscribeAsync(tagPath, (path, value) =>
+ {
+ self.Tell(new TagValueReceived(path, value));
+ }).ContinueWith(t =>
+ {
+ if (t.IsCompletedSuccessfully)
+ return new TagResolutionSucceeded(tagPath, t.Result) as object;
+ return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
+ }).PipeTo(self);
+ }
+ }
+
+ // ── Bad Quality Push (WP-9) ──
+
+ private void PushBadQualityForAllTags()
+ {
+ var now = DateTimeOffset.UtcNow;
+ foreach (var (instanceName, tags) in _subscriptionsByInstance)
+ {
+ if (!_subscribers.TryGetValue(instanceName, out var subscriber))
+ continue;
+
+ subscriber.Tell(new ConnectionQualityChanged(_connectionName, QualityCode.Bad, now));
+ }
+ }
+
+ // ── Re-subscribe (WP-10) ──
+
+ private void ReSubscribeAll()
+ {
+ _log.Info("[{0}] Re-subscribing {1} tags after reconnect", _connectionName, _subscriptionIds.Count);
+
+ var self = Self;
+ var allTags = _subscriptionIds.Keys.ToList();
+ _subscriptionIds.Clear();
+ _resolvedTags = 0;
+
+ foreach (var tagPath in allTags)
+ {
+ _adapter.SubscribeAsync(tagPath, (path, value) =>
+ {
+ self.Tell(new TagValueReceived(path, value));
+ }).ContinueWith(t =>
+ {
+ if (t.IsCompletedSuccessfully)
+ return new TagResolutionSucceeded(tagPath, t.Result) as object;
+ return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
+ }).PipeTo(self);
+ }
+ }
+
+ // ── Health Reporting (WP-13) ──
+
+ private void ReplyWithHealthReport()
+ {
+ var status = _adapter.Status;
+ Sender.Tell(new DataConnectionHealthReport(
+ _connectionName, status, _totalSubscribed, _resolvedTags, DateTimeOffset.UtcNow));
+ }
+
+ // ── Internal message handlers for piped async results ──
+
+ private void HandleTagValueReceived(TagValueReceived msg)
+ {
+ // Fan out to all subscribed instances
+ foreach (var (instanceName, tags) in _subscriptionsByInstance)
+ {
+ if (!tags.Contains(msg.TagPath))
+ continue;
+
+ if (_subscribers.TryGetValue(instanceName, out var subscriber))
+ {
+ subscriber.Tell(new TagValueUpdate(
+ _connectionName, msg.TagPath, msg.Value.Value, msg.Value.Quality, msg.Value.Timestamp));
+ }
+ }
+ }
+
+ // ── Internal messages ──
+
+ internal record AttemptConnect;
+ internal record ConnectResult(bool Success, string? Error);
+ internal record AdapterDisconnected;
+ internal record TagValueReceived(string TagPath, TagValue Value);
+ internal record TagResolutionFailed(string TagPath, string Error);
+ internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
+ internal record RetryTagResolution;
+ public record GetHealthReport;
+}
diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs
new file mode 100644
index 0000000..872ef57
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs
@@ -0,0 +1,142 @@
+using Akka.Actor;
+using Akka.Event;
+using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.Commons.Messages.DataConnection;
+
+namespace ScadaLink.DataConnectionLayer.Actors;
+
+///
+/// WP-34: Protocol extensibility — manages DataConnectionActor instances.
+/// Routes messages to the correct connection actor based on connection name.
+/// Adding a new protocol = implement IDataConnection + register with IDataConnectionFactory.
+///
+public class DataConnectionManagerActor : ReceiveActor
+{
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+ private readonly IDataConnectionFactory _factory;
+ private readonly DataConnectionOptions _options;
+ private readonly Dictionary _connectionActors = new();
+
+ public DataConnectionManagerActor(
+ IDataConnectionFactory factory,
+ DataConnectionOptions options)
+ {
+ _factory = factory;
+ _options = options;
+
+ Receive(HandleCreateConnection);
+ Receive(HandleRoute);
+ Receive(HandleRoute);
+ Receive(HandleRouteWrite);
+ Receive(HandleRemoveConnection);
+ Receive(HandleGetAllHealthReports);
+ }
+
+ private void HandleCreateConnection(CreateConnectionCommand command)
+ {
+ if (_connectionActors.ContainsKey(command.ConnectionName))
+ {
+ _log.Warning("Connection {0} already exists", command.ConnectionName);
+ return;
+ }
+
+ // WP-34: Factory creates the correct adapter based on protocol type
+ var adapter = _factory.Create(command.ProtocolType, command.ConnectionDetails);
+
+ var props = Props.Create(() => new DataConnectionActor(
+ command.ConnectionName, adapter, _options));
+
+ var actorRef = Context.ActorOf(props, command.ConnectionName);
+ _connectionActors[command.ConnectionName] = actorRef;
+
+ _log.Info("Created DataConnectionActor for {0} (protocol={1})",
+ command.ConnectionName, command.ProtocolType);
+ }
+
+ private void HandleRoute(SubscribeTagsRequest request)
+ {
+ if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
+ actor.Forward(request);
+ else
+ {
+ _log.Warning("No connection actor for {0}", request.ConnectionName);
+ Sender.Tell(new SubscribeTagsResponse(
+ request.CorrelationId, request.InstanceUniqueName, false,
+ $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow));
+ }
+ }
+
+ private void HandleRoute(UnsubscribeTagsRequest request)
+ {
+ if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
+ actor.Forward(request);
+ else
+ _log.Warning("No connection actor for {0} during unsubscribe", request.ConnectionName);
+ }
+
+ private void HandleRouteWrite(WriteTagRequest request)
+ {
+ if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
+ actor.Forward(request);
+ else
+ {
+ _log.Warning("No connection actor for {0}", request.ConnectionName);
+ Sender.Tell(new WriteTagResponse(
+ request.CorrelationId, false,
+ $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow));
+ }
+ }
+
+ private void HandleRemoveConnection(RemoveConnectionCommand command)
+ {
+ if (_connectionActors.TryGetValue(command.ConnectionName, out var actor))
+ {
+ Context.Stop(actor);
+ _connectionActors.Remove(command.ConnectionName);
+ _log.Info("Removed DataConnectionActor for {0}", command.ConnectionName);
+ }
+ }
+
+ private void HandleGetAllHealthReports(GetAllHealthReports _)
+ {
+ // Forward health report requests to all connection actors
+ foreach (var actor in _connectionActors.Values)
+ {
+ actor.Forward(new DataConnectionActor.GetHealthReport());
+ }
+ }
+
+ ///
+ /// OneForOneStrategy with Restart for connection actors — a failed connection
+ /// should restart and attempt reconnection.
+ ///
+ protected override SupervisorStrategy SupervisorStrategy()
+ {
+ return new OneForOneStrategy(
+ maxNrOfRetries: 10,
+ withinTimeRange: TimeSpan.FromMinutes(1),
+ decider: Decider.From(ex =>
+ {
+ _log.Warning(ex, "DataConnectionActor threw exception, restarting");
+ return Directive.Restart;
+ }));
+ }
+}
+
+///
+/// Command to create a new data connection actor for a specific protocol.
+///
+public record CreateConnectionCommand(
+ string ConnectionName,
+ string ProtocolType,
+ IDictionary ConnectionDetails);
+
+///
+/// Command to remove a data connection actor.
+///
+public record RemoveConnectionCommand(string ConnectionName);
+
+///
+/// Request for health reports from all active connections.
+///
+public record GetAllHealthReports;
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/ILmxProxyClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/ILmxProxyClient.cs
new file mode 100644
index 0000000..61608e0
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/ILmxProxyClient.cs
@@ -0,0 +1,120 @@
+namespace ScadaLink.DataConnectionLayer.Adapters;
+
+///
+/// WP-8: Abstraction over the LmxProxy SDK client for testability.
+/// The actual LmxProxyClient SDK lives in a separate repo; this interface
+/// defines the contract the adapter depends on.
+///
+/// LmxProxy uses gRPC streaming for subscriptions and a session-based model
+/// with keep-alive for connection management.
+///
+public interface ILmxProxyClient : IAsyncDisposable
+{
+ ///
+ /// Opens a session to the LmxProxy server. Returns a session ID.
+ ///
+ Task OpenSessionAsync(string host, int port, CancellationToken cancellationToken = default);
+
+ ///
+ /// Closes the current session.
+ ///
+ Task CloseSessionAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// Sends a keep-alive to maintain the session.
+ ///
+ Task SendKeepAliveAsync(CancellationToken cancellationToken = default);
+
+ bool IsConnected { get; }
+ string? SessionId { get; }
+
+ ///
+ /// Subscribes to tag value changes via gRPC streaming. Returns a subscription handle.
+ ///
+ Task SubscribeTagAsync(
+ string tagPath,
+ Action onValueChanged,
+ CancellationToken cancellationToken = default);
+
+ Task UnsubscribeTagAsync(string subscriptionHandle, CancellationToken cancellationToken = default);
+
+ Task<(object? Value, DateTime Timestamp, bool IsGood)> ReadTagAsync(
+ string tagPath, CancellationToken cancellationToken = default);
+
+ Task WriteTagAsync(string tagPath, object? value, CancellationToken cancellationToken = default);
+}
+
+///
+/// Factory for creating ILmxProxyClient instances.
+///
+public interface ILmxProxyClientFactory
+{
+ ILmxProxyClient Create();
+}
+
+///
+/// Default factory that creates stub LmxProxy clients.
+/// In production, this would create real LmxProxy SDK client instances.
+///
+public class DefaultLmxProxyClientFactory : ILmxProxyClientFactory
+{
+ public ILmxProxyClient Create() => new StubLmxProxyClient();
+}
+
+///
+/// Stub LmxProxy client for development/testing.
+///
+internal class StubLmxProxyClient : ILmxProxyClient
+{
+ public bool IsConnected { get; private set; }
+ public string? SessionId { get; private set; }
+
+ public Task OpenSessionAsync(string host, int port, CancellationToken cancellationToken = default)
+ {
+ SessionId = Guid.NewGuid().ToString();
+ IsConnected = true;
+ return Task.FromResult(SessionId);
+ }
+
+ public Task CloseSessionAsync(CancellationToken cancellationToken = default)
+ {
+ IsConnected = false;
+ SessionId = null;
+ return Task.CompletedTask;
+ }
+
+ public Task SendKeepAliveAsync(CancellationToken cancellationToken = default)
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task SubscribeTagAsync(
+ string tagPath, Action onValueChanged,
+ CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult(Guid.NewGuid().ToString());
+ }
+
+ public Task UnsubscribeTagAsync(string subscriptionHandle, CancellationToken cancellationToken = default)
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task<(object? Value, DateTime Timestamp, bool IsGood)> ReadTagAsync(
+ string tagPath, CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult<(object?, DateTime, bool)>((null, DateTime.UtcNow, true));
+ }
+
+ public Task WriteTagAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult(true);
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ IsConnected = false;
+ SessionId = null;
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs
new file mode 100644
index 0000000..7189580
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs
@@ -0,0 +1,94 @@
+namespace ScadaLink.DataConnectionLayer.Adapters;
+
+///
+/// WP-7: Abstraction over OPC UA client library for testability.
+/// The real implementation would wrap an OPC UA SDK (e.g., OPC Foundation .NET Standard Library).
+///
+public interface IOpcUaClient : IAsyncDisposable
+{
+ Task ConnectAsync(string endpointUrl, CancellationToken cancellationToken = default);
+ Task DisconnectAsync(CancellationToken cancellationToken = default);
+ bool IsConnected { get; }
+
+ ///
+ /// Creates a monitored item subscription for a node. Returns a subscription handle.
+ ///
+ Task CreateSubscriptionAsync(
+ string nodeId,
+ Action onValueChanged,
+ CancellationToken cancellationToken = default);
+
+ Task RemoveSubscriptionAsync(string subscriptionHandle, CancellationToken cancellationToken = default);
+
+ Task<(object? Value, DateTime SourceTimestamp, uint StatusCode)> ReadValueAsync(
+ string nodeId, CancellationToken cancellationToken = default);
+
+ Task WriteValueAsync(string nodeId, object? value, CancellationToken cancellationToken = default);
+}
+
+///
+/// Factory for creating IOpcUaClient instances.
+///
+public interface IOpcUaClientFactory
+{
+ IOpcUaClient Create();
+}
+
+///
+/// Default factory that creates stub OPC UA clients.
+/// In production, this would create real OPC UA SDK client instances.
+///
+public class DefaultOpcUaClientFactory : IOpcUaClientFactory
+{
+ public IOpcUaClient Create() => new StubOpcUaClient();
+}
+
+///
+/// Stub OPC UA client for development/testing. A real implementation would
+/// wrap the OPC Foundation .NET Standard Library.
+///
+internal class StubOpcUaClient : IOpcUaClient
+{
+ public bool IsConnected { get; private set; }
+
+ public Task ConnectAsync(string endpointUrl, CancellationToken cancellationToken = default)
+ {
+ IsConnected = true;
+ return Task.CompletedTask;
+ }
+
+ public Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ IsConnected = false;
+ return Task.CompletedTask;
+ }
+
+ public Task CreateSubscriptionAsync(
+ string nodeId, Action onValueChanged,
+ CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult(Guid.NewGuid().ToString());
+ }
+
+ public Task RemoveSubscriptionAsync(string subscriptionHandle, CancellationToken cancellationToken = default)
+ {
+ return Task.CompletedTask;
+ }
+
+ public Task<(object? Value, DateTime SourceTimestamp, uint StatusCode)> ReadValueAsync(
+ string nodeId, CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult<(object?, DateTime, uint)>((null, DateTime.UtcNow, 0));
+ }
+
+ public Task WriteValueAsync(string nodeId, object? value, CancellationToken cancellationToken = default)
+ {
+ return Task.FromResult(0); // Good status
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ IsConnected = false;
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
new file mode 100644
index 0000000..a4d0222
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
@@ -0,0 +1,196 @@
+using Microsoft.Extensions.Logging;
+using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.DataConnectionLayer.Adapters;
+
+///
+/// WP-8: LmxProxy adapter implementing IDataConnection.
+/// Maps IDataConnection to LmxProxy SDK calls.
+///
+/// LmxProxy-specific behavior:
+/// - Session-based connection with 30s keep-alive
+/// - gRPC streaming for subscriptions
+/// - SessionId management (required for all operations)
+///
+public class LmxProxyDataConnection : IDataConnection
+{
+ private readonly ILmxProxyClientFactory _clientFactory;
+ private readonly ILogger _logger;
+ private ILmxProxyClient? _client;
+ private string _host = "localhost";
+ private int _port = 5000;
+ private ConnectionHealth _status = ConnectionHealth.Disconnected;
+ private Timer? _keepAliveTimer;
+
+ private readonly Dictionary _subscriptionHandles = new();
+
+ public LmxProxyDataConnection(ILmxProxyClientFactory clientFactory, ILogger logger)
+ {
+ _clientFactory = clientFactory;
+ _logger = logger;
+ }
+
+ public ConnectionHealth Status => _status;
+
+ public async Task ConnectAsync(IDictionary connectionDetails, CancellationToken cancellationToken = default)
+ {
+ _host = connectionDetails.TryGetValue("Host", out var host) ? host : "localhost";
+ if (connectionDetails.TryGetValue("Port", out var portStr) && int.TryParse(portStr, out var port))
+ _port = port;
+
+ _status = ConnectionHealth.Connecting;
+ _client = _clientFactory.Create();
+
+ var sessionId = await _client.OpenSessionAsync(_host, _port, cancellationToken);
+ _status = ConnectionHealth.Connected;
+
+ // Start 30s keep-alive timer per design spec
+ _keepAliveTimer = new Timer(
+ async _ => await SendKeepAliveAsync(),
+ null,
+ TimeSpan.FromSeconds(30),
+ TimeSpan.FromSeconds(30));
+
+ _logger.LogInformation("LmxProxy connected to {Host}:{Port}, sessionId={SessionId}", _host, _port, sessionId);
+ }
+
+ public async Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ _keepAliveTimer?.Dispose();
+ _keepAliveTimer = null;
+
+ if (_client != null)
+ {
+ await _client.CloseSessionAsync(cancellationToken);
+ _status = ConnectionHealth.Disconnected;
+ _logger.LogInformation("LmxProxy disconnected from {Host}:{Port}", _host, _port);
+ }
+ }
+
+ public async Task SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var handle = await _client!.SubscribeTagAsync(
+ tagPath,
+ (path, value, timestamp, isGood) =>
+ {
+ var quality = isGood ? QualityCode.Good : QualityCode.Bad;
+ callback(path, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)));
+ },
+ cancellationToken);
+
+ _subscriptionHandles[handle] = tagPath;
+ return handle;
+ }
+
+ public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
+ {
+ if (_client != null)
+ {
+ await _client.UnsubscribeTagAsync(subscriptionId, cancellationToken);
+ _subscriptionHandles.Remove(subscriptionId);
+ }
+ }
+
+ public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var (value, timestamp, isGood) = await _client!.ReadTagAsync(tagPath, cancellationToken);
+ var quality = isGood ? QualityCode.Good : QualityCode.Bad;
+
+ if (!isGood)
+ return new ReadResult(false, null, "LmxProxy read returned bad quality");
+
+ return new ReadResult(true, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)), null);
+ }
+
+ public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default)
+ {
+ var results = new Dictionary();
+ foreach (var tagPath in tagPaths)
+ {
+ results[tagPath] = await ReadAsync(tagPath, cancellationToken);
+ }
+ return results;
+ }
+
+ public async Task WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var success = await _client!.WriteTagAsync(tagPath, value, cancellationToken);
+ return success
+ ? new WriteResult(true, null)
+ : new WriteResult(false, "LmxProxy write failed");
+ }
+
+ public async Task> WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default)
+ {
+ var results = new Dictionary();
+ foreach (var (tagPath, value) in values)
+ {
+ results[tagPath] = await WriteAsync(tagPath, value, cancellationToken);
+ }
+ return results;
+ }
+
+ public async Task WriteBatchAndWaitAsync(
+ IDictionary values, string flagPath, object? flagValue,
+ string responsePath, object? responseValue, TimeSpan timeout,
+ CancellationToken cancellationToken = default)
+ {
+ var allValues = new Dictionary(values) { [flagPath] = flagValue };
+ var writeResults = await WriteBatchAsync(allValues, cancellationToken);
+
+ if (writeResults.Values.Any(r => !r.Success))
+ return false;
+
+ var deadline = DateTimeOffset.UtcNow + timeout;
+ while (DateTimeOffset.UtcNow < deadline)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ var readResult = await ReadAsync(responsePath, cancellationToken);
+ if (readResult.Success && readResult.Value != null && Equals(readResult.Value.Value, responseValue))
+ return true;
+
+ await Task.Delay(100, cancellationToken);
+ }
+
+ return false;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ _keepAliveTimer?.Dispose();
+ _keepAliveTimer = null;
+
+ if (_client != null)
+ {
+ await _client.DisposeAsync();
+ _client = null;
+ }
+ _status = ConnectionHealth.Disconnected;
+ }
+
+ private void EnsureConnected()
+ {
+ if (_client == null || !_client.IsConnected)
+ throw new InvalidOperationException("LmxProxy client is not connected.");
+ }
+
+ private async Task SendKeepAliveAsync()
+ {
+ try
+ {
+ if (_client?.IsConnected == true)
+ await _client.SendKeepAliveAsync();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "LmxProxy keep-alive failed for {Host}:{Port}", _host, _port);
+ }
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs
new file mode 100644
index 0000000..b527213
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs
@@ -0,0 +1,183 @@
+using Microsoft.Extensions.Logging;
+using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.DataConnectionLayer.Adapters;
+
+///
+/// WP-7: OPC UA adapter implementing IDataConnection.
+/// Maps IDataConnection methods to OPC UA concepts via IOpcUaClient abstraction.
+///
+/// OPC UA mapping:
+/// - TagPath → NodeId (e.g., "ns=2;s=MyDevice.Temperature")
+/// - Subscribe → MonitoredItem with DataChangeNotification
+/// - Read/Write → Read/Write service calls
+/// - Quality → OPC UA StatusCode mapping
+///
+public class OpcUaDataConnection : IDataConnection
+{
+ private readonly IOpcUaClientFactory _clientFactory;
+ private readonly ILogger _logger;
+ private IOpcUaClient? _client;
+ private string _endpointUrl = string.Empty;
+ private ConnectionHealth _status = ConnectionHealth.Disconnected;
+
+ ///
+ /// Maps subscription IDs to their tag paths for cleanup.
+ ///
+ private readonly Dictionary _subscriptionHandles = new();
+
+ public OpcUaDataConnection(IOpcUaClientFactory clientFactory, ILogger logger)
+ {
+ _clientFactory = clientFactory;
+ _logger = logger;
+ }
+
+ public ConnectionHealth Status => _status;
+
+ public async Task ConnectAsync(IDictionary connectionDetails, CancellationToken cancellationToken = default)
+ {
+ _endpointUrl = connectionDetails.TryGetValue("EndpointUrl", out var url) ? url : "opc.tcp://localhost:4840";
+ _status = ConnectionHealth.Connecting;
+
+ _client = _clientFactory.Create();
+ await _client.ConnectAsync(_endpointUrl, cancellationToken);
+
+ _status = ConnectionHealth.Connected;
+ _logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
+ }
+
+ public async Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ if (_client != null)
+ {
+ await _client.DisconnectAsync(cancellationToken);
+ _status = ConnectionHealth.Disconnected;
+ _logger.LogInformation("OPC UA disconnected from {Endpoint}", _endpointUrl);
+ }
+ }
+
+ public async Task SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var subscriptionId = await _client!.CreateSubscriptionAsync(
+ tagPath,
+ (nodeId, value, timestamp, statusCode) =>
+ {
+ var quality = MapStatusCode(statusCode);
+ callback(tagPath, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)));
+ },
+ cancellationToken);
+
+ _subscriptionHandles[subscriptionId] = tagPath;
+ return subscriptionId;
+ }
+
+ public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
+ {
+ if (_client != null)
+ {
+ await _client.RemoveSubscriptionAsync(subscriptionId, cancellationToken);
+ _subscriptionHandles.Remove(subscriptionId);
+ }
+ }
+
+ public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var (value, timestamp, statusCode) = await _client!.ReadValueAsync(tagPath, cancellationToken);
+ var quality = MapStatusCode(statusCode);
+
+ if (quality == QualityCode.Bad)
+ return new ReadResult(false, null, $"OPC UA read returned bad status: 0x{statusCode:X8}");
+
+ return new ReadResult(true, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)), null);
+ }
+
+ public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default)
+ {
+ var results = new Dictionary();
+ foreach (var tagPath in tagPaths)
+ {
+ results[tagPath] = await ReadAsync(tagPath, cancellationToken);
+ }
+ return results;
+ }
+
+ public async Task WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
+ {
+ EnsureConnected();
+
+ var statusCode = await _client!.WriteValueAsync(tagPath, value, cancellationToken);
+ if (statusCode != 0)
+ return new WriteResult(false, $"OPC UA write failed with status: 0x{statusCode:X8}");
+
+ return new WriteResult(true, null);
+ }
+
+ public async Task> WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default)
+ {
+ var results = new Dictionary();
+ foreach (var (tagPath, value) in values)
+ {
+ results[tagPath] = await WriteAsync(tagPath, value, cancellationToken);
+ }
+ return results;
+ }
+
+ public async Task WriteBatchAndWaitAsync(
+ IDictionary values, string flagPath, object? flagValue,
+ string responsePath, object? responseValue, TimeSpan timeout,
+ CancellationToken cancellationToken = default)
+ {
+ // Write all values including the flag
+ var allValues = new Dictionary(values) { [flagPath] = flagValue };
+ var writeResults = await WriteBatchAsync(allValues, cancellationToken);
+
+ if (writeResults.Values.Any(r => !r.Success))
+ return false;
+
+ // Poll for response value within timeout
+ var deadline = DateTimeOffset.UtcNow + timeout;
+ while (DateTimeOffset.UtcNow < deadline)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ var readResult = await ReadAsync(responsePath, cancellationToken);
+ if (readResult.Success && readResult.Value != null && Equals(readResult.Value.Value, responseValue))
+ return true;
+
+ await Task.Delay(100, cancellationToken);
+ }
+
+ return false;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_client != null)
+ {
+ await _client.DisposeAsync();
+ _client = null;
+ }
+ _status = ConnectionHealth.Disconnected;
+ }
+
+ private void EnsureConnected()
+ {
+ if (_client == null || !_client.IsConnected)
+ throw new InvalidOperationException("OPC UA client is not connected.");
+ }
+
+ ///
+ /// Maps OPC UA StatusCode to QualityCode.
+ /// StatusCode 0 = Good, high bit set = Bad, otherwise Uncertain.
+ ///
+ private static QualityCode MapStatusCode(uint statusCode)
+ {
+ if (statusCode == 0) return QualityCode.Good;
+ if ((statusCode & 0x80000000) != 0) return QualityCode.Bad;
+ return QualityCode.Uncertain;
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/DataConnectionFactory.cs b/src/ScadaLink.DataConnectionLayer/DataConnectionFactory.cs
new file mode 100644
index 0000000..7aa30ef
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/DataConnectionFactory.cs
@@ -0,0 +1,43 @@
+using Microsoft.Extensions.Logging;
+using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.DataConnectionLayer.Adapters;
+
+namespace ScadaLink.DataConnectionLayer;
+
+///
+/// WP-34: Default factory that resolves protocol type strings to IDataConnection adapters.
+/// Protocol extensibility: register new adapters via the constructor or AddAdapter method.
+///
+public class DataConnectionFactory : IDataConnectionFactory
+{
+ private readonly Dictionary, IDataConnection>> _factories = new(StringComparer.OrdinalIgnoreCase);
+ private readonly ILoggerFactory _loggerFactory;
+
+ public DataConnectionFactory(ILoggerFactory loggerFactory)
+ {
+ _loggerFactory = loggerFactory;
+
+ // Register built-in protocols
+ RegisterAdapter("OpcUa", details => new OpcUaDataConnection(
+ new DefaultOpcUaClientFactory(), _loggerFactory.CreateLogger()));
+ RegisterAdapter("LmxProxy", details => new LmxProxyDataConnection(
+ new DefaultLmxProxyClientFactory(), _loggerFactory.CreateLogger()));
+ }
+
+ ///
+ /// Registers a new protocol adapter factory. This is the extension point
+ /// for adding new protocols without modifying existing code.
+ ///
+ public void RegisterAdapter(string protocolType, Func, IDataConnection> factory)
+ {
+ _factories[protocolType] = factory;
+ }
+
+ public IDataConnection Create(string protocolType, IDictionary connectionDetails)
+ {
+ if (!_factories.TryGetValue(protocolType, out var factory))
+ throw new ArgumentException($"Unknown protocol type: {protocolType}. Registered protocols: {string.Join(", ", _factories.Keys)}");
+
+ return factory(connectionDetails);
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs b/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs
index 423d9aa..02073e0 100644
--- a/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs
+++ b/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs
@@ -1,8 +1,19 @@
namespace ScadaLink.DataConnectionLayer;
+///
+/// Configuration options for the Data Connection Layer.
+///
public class DataConnectionOptions
{
+ /// Fixed interval between reconnect attempts after disconnect.
public TimeSpan ReconnectInterval { get; set; } = TimeSpan.FromSeconds(5);
+
+ /// Interval for retrying failed tag path resolution.
public TimeSpan TagResolutionRetryInterval { get; set; } = TimeSpan.FromSeconds(10);
+
+ /// Timeout for synchronous write operations to devices.
public TimeSpan WriteTimeout { get; set; } = TimeSpan.FromSeconds(30);
+
+ /// LmxProxy keep-alive interval for gRPC sessions.
+ public TimeSpan LmxProxyKeepAliveInterval { get; set; } = TimeSpan.FromSeconds(30);
}
diff --git a/src/ScadaLink.DataConnectionLayer/IDataConnectionFactory.cs b/src/ScadaLink.DataConnectionLayer/IDataConnectionFactory.cs
new file mode 100644
index 0000000..12e9fed
--- /dev/null
+++ b/src/ScadaLink.DataConnectionLayer/IDataConnectionFactory.cs
@@ -0,0 +1,18 @@
+using ScadaLink.Commons.Interfaces.Protocol;
+
+namespace ScadaLink.DataConnectionLayer;
+
+///
+/// WP-34: Factory for creating IDataConnection adapters based on protocol type.
+/// Adding a new protocol = implement IDataConnection + register with the factory.
+///
+public interface IDataConnectionFactory
+{
+ ///
+ /// Creates an IDataConnection adapter for the specified protocol type.
+ ///
+ /// Protocol identifier (e.g., "OpcUa", "LmxProxy").
+ /// Protocol-specific connection parameters.
+ /// A configured but not yet connected IDataConnection instance.
+ IDataConnection Create(string protocolType, IDictionary connectionDetails);
+}
diff --git a/src/ScadaLink.DataConnectionLayer/ScadaLink.DataConnectionLayer.csproj b/src/ScadaLink.DataConnectionLayer/ScadaLink.DataConnectionLayer.csproj
index 049c7d9..adb43f8 100644
--- a/src/ScadaLink.DataConnectionLayer/ScadaLink.DataConnectionLayer.csproj
+++ b/src/ScadaLink.DataConnectionLayer/ScadaLink.DataConnectionLayer.csproj
@@ -8,8 +8,13 @@
+
+
+
+
+
diff --git a/src/ScadaLink.DataConnectionLayer/ServiceCollectionExtensions.cs b/src/ScadaLink.DataConnectionLayer/ServiceCollectionExtensions.cs
index 4c04de9..28fd163 100644
--- a/src/ScadaLink.DataConnectionLayer/ServiceCollectionExtensions.cs
+++ b/src/ScadaLink.DataConnectionLayer/ServiceCollectionExtensions.cs
@@ -6,13 +6,19 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddDataConnectionLayer(this IServiceCollection services)
{
- // Phase 0: skeleton only
+ services.AddOptions()
+ .BindConfiguration("DataConnectionLayer");
+
+ // WP-34: Register the factory for protocol extensibility
+ services.AddSingleton();
+
return services;
}
public static IServiceCollection AddDataConnectionLayerActors(this IServiceCollection services)
{
- // Phase 0: placeholder for Akka actor registration
+ // Actor registration happens in AkkaHostedService or SiteCommunicationActor setup.
+ // DataConnectionManagerActor and DataConnectionActor instances are created by the actor system.
return services;
}
}
diff --git a/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs b/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs
new file mode 100644
index 0000000..67518e4
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs
@@ -0,0 +1,134 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Central-side aggregator that receives health reports from all sites,
+/// tracks latest metrics in memory, and detects offline sites.
+/// No persistence — display-only for Central UI consumption.
+///
+public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregator
+{
+ private readonly ConcurrentDictionary _siteStates = new();
+ private readonly HealthMonitoringOptions _options;
+ private readonly ILogger _logger;
+ private readonly TimeProvider _timeProvider;
+
+ public CentralHealthAggregator(
+ IOptions options,
+ ILogger logger,
+ TimeProvider? timeProvider = null)
+ {
+ _options = options.Value;
+ _logger = logger;
+ _timeProvider = timeProvider ?? TimeProvider.System;
+ }
+
+ ///
+ /// Process an incoming health report from a site.
+ /// Only replaces stored state if incoming sequence number is greater than last received.
+ /// Auto-marks previously offline sites as online.
+ ///
+ public void ProcessReport(SiteHealthReport report)
+ {
+ var now = _timeProvider.GetUtcNow();
+
+ _siteStates.AddOrUpdate(
+ report.SiteId,
+ _ =>
+ {
+ _logger.LogInformation("Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
+ return new SiteHealthState
+ {
+ SiteId = report.SiteId,
+ LatestReport = report,
+ LastReportReceivedAt = now,
+ LastSequenceNumber = report.SequenceNumber,
+ IsOnline = true
+ };
+ },
+ (_, existing) =>
+ {
+ if (report.SequenceNumber <= existing.LastSequenceNumber)
+ {
+ _logger.LogDebug(
+ "Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}",
+ report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
+ return existing;
+ }
+
+ var wasOffline = !existing.IsOnline;
+ existing.LatestReport = report;
+ existing.LastReportReceivedAt = now;
+ existing.LastSequenceNumber = report.SequenceNumber;
+ existing.IsOnline = true;
+
+ if (wasOffline)
+ {
+ _logger.LogInformation("Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
+ }
+
+ return existing;
+ });
+ }
+
+ ///
+ /// Get the current health state for all known sites.
+ ///
+ public IReadOnlyDictionary GetAllSiteStates()
+ {
+ return new Dictionary(_siteStates);
+ }
+
+ ///
+ /// Get the current health state for a specific site, or null if unknown.
+ ///
+ public SiteHealthState? GetSiteState(string siteId)
+ {
+ _siteStates.TryGetValue(siteId, out var state);
+ return state;
+ }
+
+ ///
+ /// Background task that periodically checks for offline sites.
+ ///
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ _logger.LogInformation(
+ "Central health aggregator started, offline timeout {Timeout}s",
+ _options.OfflineTimeout.TotalSeconds);
+
+ // Check at half the offline timeout interval for timely detection
+ var checkInterval = TimeSpan.FromMilliseconds(_options.OfflineTimeout.TotalMilliseconds / 2);
+ using var timer = new PeriodicTimer(checkInterval);
+
+ while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
+ {
+ CheckForOfflineSites();
+ }
+ }
+
+ internal void CheckForOfflineSites()
+ {
+ var now = _timeProvider.GetUtcNow();
+
+ foreach (var kvp in _siteStates)
+ {
+ var state = kvp.Value;
+ if (!state.IsOnline) continue;
+
+ var elapsed = now - state.LastReportReceivedAt;
+ if (elapsed > _options.OfflineTimeout)
+ {
+ state.IsOnline = false;
+ _logger.LogWarning(
+ "Site {SiteId} marked offline — no report for {Elapsed}s (timeout: {Timeout}s)",
+ state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
+ }
+ }
+ }
+}
diff --git a/src/ScadaLink.HealthMonitoring/HealthReportSender.cs b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs
new file mode 100644
index 0000000..303934e
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs
@@ -0,0 +1,69 @@
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Periodically collects a SiteHealthReport and sends it to central via Akka remoting.
+/// Sequence numbers are monotonic, starting at 1, and reset on service restart.
+///
+public class HealthReportSender : BackgroundService
+{
+ private readonly ISiteHealthCollector _collector;
+ private readonly IHealthReportTransport _transport;
+ private readonly HealthMonitoringOptions _options;
+ private readonly ILogger _logger;
+ private readonly string _siteId;
+ private long _sequenceNumber;
+
+ public HealthReportSender(
+ ISiteHealthCollector collector,
+ IHealthReportTransport transport,
+ IOptions options,
+ ILogger logger,
+ ISiteIdentityProvider siteIdentityProvider)
+ {
+ _collector = collector;
+ _transport = transport;
+ _options = options.Value;
+ _logger = logger;
+ _siteId = siteIdentityProvider.SiteId;
+ }
+
+ ///
+ /// Current sequence number (for testing).
+ ///
+ public long CurrentSequenceNumber => Interlocked.Read(ref _sequenceNumber);
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ _logger.LogInformation(
+ "Health report sender starting for site {SiteId}, interval {Interval}s",
+ _siteId, _options.ReportInterval.TotalSeconds);
+
+ using var timer = new PeriodicTimer(_options.ReportInterval);
+
+ while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
+ {
+ try
+ {
+ var seq = Interlocked.Increment(ref _sequenceNumber);
+ var report = _collector.CollectReport(_siteId);
+
+ // Replace the placeholder sequence number with our monotonic one
+ var reportWithSeq = report with { SequenceNumber = seq };
+
+ _transport.Send(reportWithSeq);
+
+ _logger.LogDebug("Sent health report #{Seq} for site {SiteId}", seq, _siteId);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Failed to send health report for site {SiteId}", _siteId);
+ // Continue sending — don't let a single failure stop reporting
+ }
+ }
+ }
+}
diff --git a/src/ScadaLink.HealthMonitoring/ICentralHealthAggregator.cs b/src/ScadaLink.HealthMonitoring/ICentralHealthAggregator.cs
new file mode 100644
index 0000000..4335caa
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/ICentralHealthAggregator.cs
@@ -0,0 +1,14 @@
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Interface for central-side health aggregation.
+/// Consumed by Central UI to display site health dashboards.
+///
+public interface ICentralHealthAggregator
+{
+ void ProcessReport(SiteHealthReport report);
+ IReadOnlyDictionary GetAllSiteStates();
+ SiteHealthState? GetSiteState(string siteId);
+}
diff --git a/src/ScadaLink.HealthMonitoring/IHealthReportTransport.cs b/src/ScadaLink.HealthMonitoring/IHealthReportTransport.cs
new file mode 100644
index 0000000..1079e11
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/IHealthReportTransport.cs
@@ -0,0 +1,12 @@
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Abstraction for sending health reports to central.
+/// In production, implemented via Akka remoting (Tell, fire-and-forget).
+///
+public interface IHealthReportTransport
+{
+ void Send(SiteHealthReport report);
+}
diff --git a/src/ScadaLink.HealthMonitoring/ISiteHealthCollector.cs b/src/ScadaLink.HealthMonitoring/ISiteHealthCollector.cs
new file mode 100644
index 0000000..95b9128
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/ISiteHealthCollector.cs
@@ -0,0 +1,19 @@
+using ScadaLink.Commons.Messages.Health;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Interface for site-side health metric collection.
+/// Consumed by Site Runtime actors to report errors, and by DCL to report connection health.
+///
+public interface ISiteHealthCollector
+{
+ void IncrementScriptError();
+ void IncrementAlarmError();
+ void IncrementDeadLetter();
+ void UpdateConnectionHealth(string connectionName, ConnectionHealth health);
+ void RemoveConnection(string connectionName);
+ void UpdateTagResolution(string connectionName, int totalSubscribed, int successfullyResolved);
+ SiteHealthReport CollectReport(string siteId);
+}
diff --git a/src/ScadaLink.HealthMonitoring/ISiteIdentityProvider.cs b/src/ScadaLink.HealthMonitoring/ISiteIdentityProvider.cs
new file mode 100644
index 0000000..d77c89a
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/ISiteIdentityProvider.cs
@@ -0,0 +1,10 @@
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Provides the identity of the current site.
+/// Implemented by the Host component to supply configuration-driven site ID.
+///
+public interface ISiteIdentityProvider
+{
+ string SiteId { get; }
+}
diff --git a/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj b/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj
index 049c7d9..04bf1bf 100644
--- a/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj
+++ b/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj
@@ -9,6 +9,8 @@
+
+
@@ -16,4 +18,8 @@
+
+
+
+
diff --git a/src/ScadaLink.HealthMonitoring/ServiceCollectionExtensions.cs b/src/ScadaLink.HealthMonitoring/ServiceCollectionExtensions.cs
index b7c6d03..64353c0 100644
--- a/src/ScadaLink.HealthMonitoring/ServiceCollectionExtensions.cs
+++ b/src/ScadaLink.HealthMonitoring/ServiceCollectionExtensions.cs
@@ -4,15 +4,30 @@ namespace ScadaLink.HealthMonitoring;
public static class ServiceCollectionExtensions
{
+ ///
+ /// Register site-side health monitoring services.
+ ///
public static IServiceCollection AddHealthMonitoring(this IServiceCollection services)
{
- // Phase 0: skeleton only
+ services.AddSingleton();
+ services.AddHostedService();
+ return services;
+ }
+
+ ///
+ /// Register central-side health aggregation services.
+ ///
+ public static IServiceCollection AddCentralHealthAggregation(this IServiceCollection services)
+ {
+ services.AddSingleton();
+ services.AddSingleton(sp => sp.GetRequiredService());
+ services.AddHostedService(sp => sp.GetRequiredService());
return services;
}
public static IServiceCollection AddHealthMonitoringActors(this IServiceCollection services)
{
- // Phase 0: placeholder for Akka actor registration
+ // Placeholder for Akka actor registration (Phase 4+)
return services;
}
}
diff --git a/src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs b/src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs
new file mode 100644
index 0000000..d245b08
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs
@@ -0,0 +1,101 @@
+using System.Collections.Concurrent;
+using ScadaLink.Commons.Messages.Health;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// Collects health metrics from all site subsystems.
+/// Thread-safe: counters use Interlocked operations, connection/tag data uses ConcurrentDictionary.
+///
+public class SiteHealthCollector : ISiteHealthCollector
+{
+ private int _scriptErrorCount;
+ private int _alarmErrorCount;
+ private int _deadLetterCount;
+ private readonly ConcurrentDictionary _connectionStatuses = new();
+ private readonly ConcurrentDictionary _tagResolutionCounts = new();
+
+ ///
+ /// Increment the script error counter. Covers unhandled exceptions,
+ /// timeouts, and recursion limit violations.
+ ///
+ public void IncrementScriptError()
+ {
+ Interlocked.Increment(ref _scriptErrorCount);
+ }
+
+ ///
+ /// Increment the alarm evaluation error counter.
+ ///
+ public void IncrementAlarmError()
+ {
+ Interlocked.Increment(ref _alarmErrorCount);
+ }
+
+ ///
+ /// Increment the dead letter counter for this reporting interval.
+ ///
+ public void IncrementDeadLetter()
+ {
+ Interlocked.Increment(ref _deadLetterCount);
+ }
+
+ ///
+ /// Update the health status for a named data connection.
+ /// Called by DCL when connection state changes.
+ ///
+ public void UpdateConnectionHealth(string connectionName, ConnectionHealth health)
+ {
+ _connectionStatuses[connectionName] = health;
+ }
+
+ ///
+ /// Remove a connection from tracking (e.g., on connection disposal).
+ ///
+ public void RemoveConnection(string connectionName)
+ {
+ _connectionStatuses.TryRemove(connectionName, out _);
+ _tagResolutionCounts.TryRemove(connectionName, out _);
+ }
+
+ ///
+ /// Update tag resolution counts for a named data connection.
+ /// Called by DCL after tag resolution attempts.
+ ///
+ public void UpdateTagResolution(string connectionName, int totalSubscribed, int successfullyResolved)
+ {
+ _tagResolutionCounts[connectionName] = new TagResolutionStatus(totalSubscribed, successfullyResolved);
+ }
+
+ ///
+ /// Collect the current health report for the site and reset interval counters.
+ /// Connection statuses and tag resolution counts are NOT reset (they reflect current state).
+ /// Script errors, alarm errors, and dead letters ARE reset (they are per-interval counts).
+ ///
+ public SiteHealthReport CollectReport(string siteId)
+ {
+ // Atomically read and reset the counters
+ var scriptErrors = Interlocked.Exchange(ref _scriptErrorCount, 0);
+ var alarmErrors = Interlocked.Exchange(ref _alarmErrorCount, 0);
+ var deadLetters = Interlocked.Exchange(ref _deadLetterCount, 0);
+
+ // Snapshot current connection and tag resolution state
+ var connectionStatuses = new Dictionary(_connectionStatuses);
+ var tagResolution = new Dictionary(_tagResolutionCounts);
+
+ // S&F buffer depth: placeholder (Phase 3C)
+ var sfBufferDepths = new Dictionary();
+
+ return new SiteHealthReport(
+ SiteId: siteId,
+ SequenceNumber: 0, // Caller (HealthReportSender) assigns the sequence number
+ ReportTimestamp: DateTimeOffset.UtcNow,
+ DataConnectionStatuses: connectionStatuses,
+ TagResolutionCounts: tagResolution,
+ ScriptErrorCount: scriptErrors,
+ AlarmEvaluationErrorCount: alarmErrors,
+ StoreAndForwardBufferDepths: sfBufferDepths,
+ DeadLetterCount: deadLetters);
+ }
+}
diff --git a/src/ScadaLink.HealthMonitoring/SiteHealthState.cs b/src/ScadaLink.HealthMonitoring/SiteHealthState.cs
new file mode 100644
index 0000000..bd23cea
--- /dev/null
+++ b/src/ScadaLink.HealthMonitoring/SiteHealthState.cs
@@ -0,0 +1,15 @@
+using ScadaLink.Commons.Messages.Health;
+
+namespace ScadaLink.HealthMonitoring;
+
+///
+/// In-memory state for a single site's health, stored by the central aggregator.
+///
+public class SiteHealthState
+{
+ public required string SiteId { get; init; }
+ public SiteHealthReport LatestReport { get; set; } = null!;
+ public DateTimeOffset LastReportReceivedAt { get; set; }
+ public long LastSequenceNumber { get; set; }
+ public bool IsOnline { get; set; }
+}
diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
index f421adf..e0f18b0 100644
--- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs
+++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
@@ -4,10 +4,14 @@ using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Microsoft.Extensions.Options;
using ScadaLink.ClusterInfrastructure;
+using ScadaLink.Communication;
+using ScadaLink.Communication.Actors;
using ScadaLink.Host.Actors;
using ScadaLink.SiteRuntime;
using ScadaLink.SiteRuntime.Actors;
using ScadaLink.SiteRuntime.Persistence;
+using ScadaLink.SiteRuntime.Scripts;
+using ScadaLink.SiteRuntime.Streaming;
namespace ScadaLink.Host.Actors;
@@ -15,12 +19,15 @@ namespace ScadaLink.Host.Actors;
/// Hosted service that manages the Akka.NET actor system lifecycle.
/// Creates the actor system on start, registers actors, and triggers
/// CoordinatedShutdown on stop.
+///
+/// WP-3: Transport heartbeat is explicitly configured in HOCON from CommunicationOptions.
///
public class AkkaHostedService : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private readonly NodeOptions _nodeOptions;
private readonly ClusterOptions _clusterOptions;
+ private readonly CommunicationOptions _communicationOptions;
private readonly ILogger _logger;
private ActorSystem? _actorSystem;
@@ -28,11 +35,13 @@ public class AkkaHostedService : IHostedService
IServiceProvider serviceProvider,
IOptions nodeOptions,
IOptions clusterOptions,
+ IOptions communicationOptions,
ILogger logger)
{
_serviceProvider = serviceProvider;
_nodeOptions = nodeOptions.Value;
_clusterOptions = clusterOptions.Value;
+ _communicationOptions = communicationOptions.Value;
_logger = logger;
}
@@ -50,6 +59,10 @@ public class AkkaHostedService : IHostedService
var roles = BuildRoles();
var rolesStr = string.Join(",", roles.Select(r => $"\"{r}\""));
+ // WP-3: Transport heartbeat explicitly configured from CommunicationOptions (not framework defaults)
+ var transportHeartbeatSec = _communicationOptions.TransportHeartbeatInterval.TotalSeconds;
+ var transportFailureSec = _communicationOptions.TransportFailureThreshold.TotalSeconds;
+
var hocon = $@"
akka {{
actor {{
@@ -60,6 +73,10 @@ akka {{
hostname = ""{_nodeOptions.NodeHostname}""
port = {_nodeOptions.RemotingPort}
}}
+ transport-failure-detector {{
+ heartbeat-interval = {transportHeartbeatSec:F0}s
+ acceptable-heartbeat-pause = {transportFailureSec:F0}s
+ }}
}}
cluster {{
seed-nodes = [{seedNodesStr}]
@@ -87,11 +104,14 @@ akka {{
_actorSystem = ActorSystem.Create("scadalink", config);
_logger.LogInformation(
- "Akka.NET actor system 'scadalink' started. Role={Role}, Roles={Roles}, Hostname={Hostname}, Port={Port}",
+ "Akka.NET actor system 'scadalink' started. Role={Role}, Roles={Roles}, Hostname={Hostname}, Port={Port}, " +
+ "TransportHeartbeat={TransportHeartbeat}s, TransportFailure={TransportFailure}s",
_nodeOptions.Role,
string.Join(", ", roles),
_nodeOptions.NodeHostname,
- _nodeOptions.RemotingPort);
+ _nodeOptions.RemotingPort,
+ transportHeartbeatSec,
+ transportFailureSec);
// Register the dead letter monitor actor
var loggerFactory = _serviceProvider.GetRequiredService();
@@ -100,8 +120,12 @@ akka {{
Props.Create(() => new DeadLetterMonitorActor(dlmLogger)),
"dead-letter-monitor");
- // For site nodes, register the Deployment Manager as a cluster singleton
- if (_nodeOptions.Role.Equals("Site", StringComparison.OrdinalIgnoreCase))
+ // Register role-specific actors
+ if (_nodeOptions.Role.Equals("Central", StringComparison.OrdinalIgnoreCase))
+ {
+ RegisterCentralActors();
+ }
+ else if (_nodeOptions.Role.Equals("Site", StringComparison.OrdinalIgnoreCase))
{
RegisterSiteActors();
}
@@ -138,7 +162,25 @@ akka {{
}
///
- /// Registers site-specific actors including the Deployment Manager cluster singleton.
+ /// Registers central-side actors including the CentralCommunicationActor.
+ /// WP-4: Central communication actor routes all 8 message patterns to sites.
+ ///
+ private void RegisterCentralActors()
+ {
+ var centralCommActor = _actorSystem!.ActorOf(
+ Props.Create(() => new CentralCommunicationActor()),
+ "central-communication");
+
+ // Wire up the CommunicationService with the actor reference
+ var commService = _serviceProvider.GetService();
+ commService?.SetCommunicationActor(centralCommActor);
+
+ _logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
+ }
+
+ ///
+ /// Registers site-specific actors including the Deployment Manager cluster singleton
+ /// and the SiteCommunicationActor.
/// The singleton is scoped to the site-specific cluster role so it runs on exactly
/// one node within this site's cluster.
///
@@ -146,6 +188,9 @@ akka {{
{
var siteRole = $"site-{_nodeOptions.SiteId}";
var storage = _serviceProvider.GetRequiredService();
+ var compilationService = _serviceProvider.GetRequiredService();
+ var sharedScriptLibrary = _serviceProvider.GetRequiredService();
+ var streamManager = _serviceProvider.GetService();
var siteRuntimeOptionsValue = _serviceProvider.GetService>()?.Value
?? new SiteRuntimeOptions();
var dmLogger = _serviceProvider.GetRequiredService()
@@ -155,6 +200,9 @@ akka {{
var singletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new DeploymentManagerActor(
storage,
+ compilationService,
+ sharedScriptLibrary,
+ streamManager,
siteRuntimeOptionsValue,
dmLogger)),
terminationMessage: PoisonPill.Instance,
@@ -171,10 +219,18 @@ akka {{
.WithRole(siteRole)
.WithSingletonName("deployment-manager"));
- _actorSystem.ActorOf(proxyProps, "deployment-manager-proxy");
+ var dmProxy = _actorSystem.ActorOf(proxyProps, "deployment-manager-proxy");
+
+ // WP-4: Create SiteCommunicationActor for receiving messages from central
+ _actorSystem.ActorOf(
+ Props.Create(() => new SiteCommunicationActor(
+ _nodeOptions.SiteId!,
+ _communicationOptions,
+ dmProxy)),
+ "site-communication");
_logger.LogInformation(
- "Site actors registered. DeploymentManager singleton scoped to role={SiteRole}",
+ "Site actors registered. DeploymentManager singleton scoped to role={SiteRole}, SiteCommunicationActor created.",
siteRole);
}
}
diff --git a/src/ScadaLink.SiteEventLogging/EventLogPurgeService.cs b/src/ScadaLink.SiteEventLogging/EventLogPurgeService.cs
new file mode 100644
index 0000000..07cd7ed
--- /dev/null
+++ b/src/ScadaLink.SiteEventLogging/EventLogPurgeService.cs
@@ -0,0 +1,120 @@
+using Microsoft.Data.Sqlite;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace ScadaLink.SiteEventLogging;
+
+///
+/// Background service that periodically purges old events from the SQLite event log.
+/// Enforces both time-based retention (default 30 days) and storage cap (default 1GB).
+/// Runs on a background thread and does not block event recording.
+///
+public class EventLogPurgeService : BackgroundService
+{
+ private readonly SiteEventLogger _eventLogger;
+ private readonly SiteEventLogOptions _options;
+ private readonly ILogger _logger;
+
+ public EventLogPurgeService(
+ ISiteEventLogger eventLogger,
+ IOptions options,
+ ILogger logger)
+ {
+ // We need the concrete type to access the connection
+ _eventLogger = (SiteEventLogger)eventLogger;
+ _options = options.Value;
+ _logger = logger;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ _logger.LogInformation(
+ "Event log purge service started — retention: {Days} days, cap: {Cap} MB, interval: {Interval}",
+ _options.RetentionDays, _options.MaxStorageMb, _options.PurgeInterval);
+
+ using var timer = new PeriodicTimer(_options.PurgeInterval);
+
+ // Run an initial purge on startup
+ RunPurge();
+
+ while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
+ {
+ RunPurge();
+ }
+ }
+
+ internal void RunPurge()
+ {
+ try
+ {
+ PurgeByRetention();
+ PurgeByStorageCap();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error during event log purge");
+ }
+ }
+
+ private void PurgeByRetention()
+ {
+ var cutoff = DateTimeOffset.UtcNow.AddDays(-_options.RetentionDays).ToString("o");
+
+ using var cmd = _eventLogger.Connection.CreateCommand();
+ cmd.CommandText = "DELETE FROM site_events WHERE timestamp < $cutoff";
+ cmd.Parameters.AddWithValue("$cutoff", cutoff);
+ var deleted = cmd.ExecuteNonQuery();
+
+ if (deleted > 0)
+ {
+ _logger.LogInformation("Purged {Count} events older than {Days} days", deleted, _options.RetentionDays);
+ }
+ }
+
+ private void PurgeByStorageCap()
+ {
+ var currentSizeBytes = GetDatabaseSizeBytes();
+ var capBytes = (long)_options.MaxStorageMb * 1024 * 1024;
+
+ if (currentSizeBytes <= capBytes)
+ return;
+
+ _logger.LogWarning(
+ "Event log size {Size:F1} MB exceeds cap {Cap} MB — purging oldest events",
+ currentSizeBytes / (1024.0 * 1024.0), _options.MaxStorageMb);
+
+ // Delete oldest events in batches until under the cap
+ while (currentSizeBytes > capBytes)
+ {
+ using var cmd = _eventLogger.Connection.CreateCommand();
+ cmd.CommandText = """
+ DELETE FROM site_events WHERE id IN (
+ SELECT id FROM site_events ORDER BY id ASC LIMIT 1000
+ )
+ """;
+ var deleted = cmd.ExecuteNonQuery();
+ if (deleted == 0) break;
+
+ // Reclaim space
+ using var vacuumCmd = _eventLogger.Connection.CreateCommand();
+ vacuumCmd.CommandText = "PRAGMA incremental_vacuum";
+ vacuumCmd.ExecuteNonQuery();
+
+ currentSizeBytes = GetDatabaseSizeBytes();
+ }
+ }
+
+ internal long GetDatabaseSizeBytes()
+ {
+ using var pageCountCmd = _eventLogger.Connection.CreateCommand();
+ pageCountCmd.CommandText = "PRAGMA page_count";
+ var pageCount = (long)pageCountCmd.ExecuteScalar()!;
+
+ using var pageSizeCmd = _eventLogger.Connection.CreateCommand();
+ pageSizeCmd.CommandText = "PRAGMA page_size";
+ var pageSize = (long)pageSizeCmd.ExecuteScalar()!;
+
+ return pageCount * pageSize;
+ }
+}
diff --git a/src/ScadaLink.SiteEventLogging/EventLogQueryService.cs b/src/ScadaLink.SiteEventLogging/EventLogQueryService.cs
new file mode 100644
index 0000000..2b129bc
--- /dev/null
+++ b/src/ScadaLink.SiteEventLogging/EventLogQueryService.cs
@@ -0,0 +1,146 @@
+using Microsoft.Data.Sqlite;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using ScadaLink.Commons.Messages.RemoteQuery;
+
+namespace ScadaLink.SiteEventLogging;
+
+///
+/// Processes event log queries locally against SQLite.
+/// Supports filtering by event_type, time range, instance_id, severity,
+/// and keyword search (LIKE on message and source).
+/// Uses keyset pagination with continuation token (last event ID).
+///
+public class EventLogQueryService : IEventLogQueryService
+{
+ private readonly SiteEventLogger _eventLogger;
+ private readonly SiteEventLogOptions _options;
+ private readonly ILogger _logger;
+
+ public EventLogQueryService(
+ ISiteEventLogger eventLogger,
+ IOptions options,
+ ILogger logger)
+ {
+ _eventLogger = (SiteEventLogger)eventLogger;
+ _options = options.Value;
+ _logger = logger;
+ }
+
+ public EventLogQueryResponse ExecuteQuery(EventLogQueryRequest request)
+ {
+ try
+ {
+ var pageSize = request.PageSize > 0 ? request.PageSize : _options.QueryPageSize;
+
+ using var cmd = _eventLogger.Connection.CreateCommand();
+ var whereClauses = new List();
+ var parameters = new List();
+
+ // Keyset pagination: only return events with id > continuation token
+ if (request.ContinuationToken.HasValue)
+ {
+ whereClauses.Add("id > $afterId");
+ parameters.Add(new SqliteParameter("$afterId", request.ContinuationToken.Value));
+ }
+
+ if (request.From.HasValue)
+ {
+ whereClauses.Add("timestamp >= $from");
+ parameters.Add(new SqliteParameter("$from", request.From.Value.ToString("o")));
+ }
+
+ if (request.To.HasValue)
+ {
+ whereClauses.Add("timestamp <= $to");
+ parameters.Add(new SqliteParameter("$to", request.To.Value.ToString("o")));
+ }
+
+ if (!string.IsNullOrWhiteSpace(request.EventType))
+ {
+ whereClauses.Add("event_type = $eventType");
+ parameters.Add(new SqliteParameter("$eventType", request.EventType));
+ }
+
+ if (!string.IsNullOrWhiteSpace(request.Severity))
+ {
+ whereClauses.Add("severity = $severity");
+ parameters.Add(new SqliteParameter("$severity", request.Severity));
+ }
+
+ if (!string.IsNullOrWhiteSpace(request.InstanceId))
+ {
+ whereClauses.Add("instance_id = $instanceId");
+ parameters.Add(new SqliteParameter("$instanceId", request.InstanceId));
+ }
+
+ if (!string.IsNullOrWhiteSpace(request.KeywordFilter))
+ {
+ whereClauses.Add("(message LIKE $keyword OR source LIKE $keyword)");
+ parameters.Add(new SqliteParameter("$keyword", $"%{request.KeywordFilter}%"));
+ }
+
+ var whereClause = whereClauses.Count > 0
+ ? "WHERE " + string.Join(" AND ", whereClauses)
+ : "";
+
+ // Fetch pageSize + 1 to determine if there are more results
+ cmd.CommandText = $"""
+ SELECT id, timestamp, event_type, severity, instance_id, source, message, details
+ FROM site_events
+ {whereClause}
+ ORDER BY id ASC
+ LIMIT $limit
+ """;
+ cmd.Parameters.AddWithValue("$limit", pageSize + 1);
+ foreach (var p in parameters)
+ cmd.Parameters.Add(p);
+
+ var entries = new List();
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ entries.Add(new EventLogEntry(
+ Id: reader.GetInt64(0),
+ Timestamp: DateTimeOffset.Parse(reader.GetString(1)),
+ EventType: reader.GetString(2),
+ Severity: reader.GetString(3),
+ InstanceId: reader.IsDBNull(4) ? null : reader.GetString(4),
+ Source: reader.GetString(5),
+ Message: reader.GetString(6),
+ Details: reader.IsDBNull(7) ? null : reader.GetString(7)));
+ }
+
+ var hasMore = entries.Count > pageSize;
+ if (hasMore)
+ {
+ entries.RemoveAt(entries.Count - 1);
+ }
+
+ var continuationToken = entries.Count > 0 ? entries[^1].Id : (long?)null;
+
+ return new EventLogQueryResponse(
+ CorrelationId: request.CorrelationId,
+ SiteId: request.SiteId,
+ Entries: entries,
+ ContinuationToken: continuationToken,
+ HasMore: hasMore,
+ Success: true,
+ ErrorMessage: null,
+ Timestamp: DateTimeOffset.UtcNow);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to execute event log query: {CorrelationId}", request.CorrelationId);
+ return new EventLogQueryResponse(
+ CorrelationId: request.CorrelationId,
+ SiteId: request.SiteId,
+ Entries: [],
+ ContinuationToken: null,
+ HasMore: false,
+ Success: false,
+ ErrorMessage: ex.Message,
+ Timestamp: DateTimeOffset.UtcNow);
+ }
+ }
+}
diff --git a/src/ScadaLink.SiteEventLogging/IEventLogQueryService.cs b/src/ScadaLink.SiteEventLogging/IEventLogQueryService.cs
new file mode 100644
index 0000000..24142b7
--- /dev/null
+++ b/src/ScadaLink.SiteEventLogging/IEventLogQueryService.cs
@@ -0,0 +1,12 @@
+using ScadaLink.Commons.Messages.RemoteQuery;
+
+namespace ScadaLink.SiteEventLogging;
+
+///
+/// Interface for querying site event logs.
+/// Used by Communication Layer to process remote queries from central.
+///
+public interface IEventLogQueryService
+{
+ EventLogQueryResponse ExecuteQuery(EventLogQueryRequest request);
+}
diff --git a/src/ScadaLink.SiteEventLogging/ISiteEventLogger.cs b/src/ScadaLink.SiteEventLogging/ISiteEventLogger.cs
new file mode 100644
index 0000000..53f41aa
--- /dev/null
+++ b/src/ScadaLink.SiteEventLogging/ISiteEventLogger.cs
@@ -0,0 +1,24 @@
+namespace ScadaLink.SiteEventLogging;
+
+///
+/// Interface for recording operational events to the local SQLite event log.
+///
+public interface ISiteEventLogger
+{
+ ///
+ /// Record an event asynchronously.
+ ///
+ /// Category: script, alarm, deployment, connection, store_and_forward, instance_lifecycle
+ /// Info, Warning, or Error
+ /// Optional instance ID associated with the event
+ /// Source identifier, e.g., "ScriptActor:MonitorSpeed"
+ /// Human-readable event description
+ /// Optional JSON details (stack traces, compilation errors, etc.)
+ Task LogEventAsync(
+ string eventType,
+ string severity,
+ string? instanceId,
+ string source,
+ string message,
+ string? details = null);
+}
diff --git a/src/ScadaLink.SiteEventLogging/ScadaLink.SiteEventLogging.csproj b/src/ScadaLink.SiteEventLogging/ScadaLink.SiteEventLogging.csproj
index 049c7d9..080a38e 100644
--- a/src/ScadaLink.SiteEventLogging/ScadaLink.SiteEventLogging.csproj
+++ b/src/ScadaLink.SiteEventLogging/ScadaLink.SiteEventLogging.csproj
@@ -8,7 +8,10 @@
+
+
+
@@ -16,4 +19,8 @@
+
+
+
+
diff --git a/src/ScadaLink.SiteEventLogging/ServiceCollectionExtensions.cs b/src/ScadaLink.SiteEventLogging/ServiceCollectionExtensions.cs
index 072743b..617fa76 100644
--- a/src/ScadaLink.SiteEventLogging/ServiceCollectionExtensions.cs
+++ b/src/ScadaLink.SiteEventLogging/ServiceCollectionExtensions.cs
@@ -4,15 +4,20 @@ namespace ScadaLink.SiteEventLogging;
public static class ServiceCollectionExtensions
{
+ ///
+ /// Register site event logging services (recording, purge, query).
+ ///
public static IServiceCollection AddSiteEventLogging(this IServiceCollection services)
{
- // Phase 0: skeleton only
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddHostedService();
return services;
}
public static IServiceCollection AddSiteEventLoggingActors(this IServiceCollection services)
{
- // Phase 0: placeholder for Akka actor registration
+ // Placeholder for Akka actor registration (Phase 4+)
return services;
}
}
diff --git a/src/ScadaLink.SiteEventLogging/SiteEventLogOptions.cs b/src/ScadaLink.SiteEventLogging/SiteEventLogOptions.cs
index 5d12b36..ec7cc5a 100644
--- a/src/ScadaLink.SiteEventLogging/SiteEventLogOptions.cs
+++ b/src/ScadaLink.SiteEventLogging/SiteEventLogOptions.cs
@@ -4,5 +4,7 @@ public class SiteEventLogOptions
{
public int RetentionDays { get; set; } = 30;
public int MaxStorageMb { get; set; } = 1024;
- public string PurgeScheduleCron { get; set; } = "0 2 * * *";
+ public string DatabasePath { get; set; } = "site_events.db";
+ public int QueryPageSize { get; set; } = 500;
+ public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24);
}
diff --git a/src/ScadaLink.SiteEventLogging/SiteEventLogger.cs b/src/ScadaLink.SiteEventLogging/SiteEventLogger.cs
new file mode 100644
index 0000000..34f0ea1
--- /dev/null
+++ b/src/ScadaLink.SiteEventLogging/SiteEventLogger.cs
@@ -0,0 +1,107 @@
+using Microsoft.Data.Sqlite;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace ScadaLink.SiteEventLogging;
+
+///
+/// Records operational events to a local SQLite database.
+/// Only the active node generates events. Not replicated to standby.
+/// On failover, the new active node starts a fresh log.
+///
+public class SiteEventLogger : ISiteEventLogger, IDisposable
+{
+ private readonly SqliteConnection _connection;
+ private readonly ILogger _logger;
+ private readonly object _writeLock = new();
+ private bool _disposed;
+
+ public SiteEventLogger(
+ IOptions options,
+ ILogger logger,
+ string? connectionStringOverride = null)
+ {
+ _logger = logger;
+
+ var connectionString = connectionStringOverride
+ ?? $"Data Source={options.Value.DatabasePath};Cache=Shared";
+ _connection = new SqliteConnection(connectionString);
+ _connection.Open();
+
+ InitializeSchema();
+ }
+
+ internal SqliteConnection Connection => _connection;
+
+ private void InitializeSchema()
+ {
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ CREATE TABLE IF NOT EXISTS site_events (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ timestamp TEXT NOT NULL,
+ event_type TEXT NOT NULL,
+ severity TEXT NOT NULL,
+ instance_id TEXT,
+ source TEXT NOT NULL,
+ message TEXT NOT NULL,
+ details TEXT
+ );
+ CREATE INDEX IF NOT EXISTS idx_events_timestamp ON site_events(timestamp);
+ CREATE INDEX IF NOT EXISTS idx_events_type ON site_events(event_type);
+ CREATE INDEX IF NOT EXISTS idx_events_instance ON site_events(instance_id);
+ """;
+ cmd.ExecuteNonQuery();
+ }
+
+ public Task LogEventAsync(
+ string eventType,
+ string severity,
+ string? instanceId,
+ string source,
+ string message,
+ string? details = null)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(eventType);
+ ArgumentException.ThrowIfNullOrWhiteSpace(severity);
+ ArgumentException.ThrowIfNullOrWhiteSpace(source);
+ ArgumentException.ThrowIfNullOrWhiteSpace(message);
+
+ var timestamp = DateTimeOffset.UtcNow.ToString("o");
+
+ lock (_writeLock)
+ {
+ if (_disposed) return Task.CompletedTask;
+
+ try
+ {
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ INSERT INTO site_events (timestamp, event_type, severity, instance_id, source, message, details)
+ VALUES ($timestamp, $event_type, $severity, $instance_id, $source, $message, $details)
+ """;
+ cmd.Parameters.AddWithValue("$timestamp", timestamp);
+ cmd.Parameters.AddWithValue("$event_type", eventType);
+ cmd.Parameters.AddWithValue("$severity", severity);
+ cmd.Parameters.AddWithValue("$instance_id", (object?)instanceId ?? DBNull.Value);
+ cmd.Parameters.AddWithValue("$source", source);
+ cmd.Parameters.AddWithValue("$message", message);
+ cmd.Parameters.AddWithValue("$details", (object?)details ?? DBNull.Value);
+ cmd.ExecuteNonQuery();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to record event: {EventType} from {Source}", eventType, source);
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public void Dispose()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ _connection.Dispose();
+ }
+}
diff --git a/src/ScadaLink.SiteRuntime/Actors/AlarmActor.cs b/src/ScadaLink.SiteRuntime/Actors/AlarmActor.cs
new file mode 100644
index 0000000..8901956
--- /dev/null
+++ b/src/ScadaLink.SiteRuntime/Actors/AlarmActor.cs
@@ -0,0 +1,305 @@
+using Akka.Actor;
+using Microsoft.CodeAnalysis.Scripting;
+using Microsoft.Extensions.Logging;
+using ScadaLink.Commons.Messages.Streaming;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.Commons.Types.Flattening;
+using ScadaLink.SiteRuntime.Scripts;
+using System.Text.Json;
+
+namespace ScadaLink.SiteRuntime.Actors;
+
+///
+/// WP-16: Alarm Actor — coordinator actor, child of Instance Actor, peer to Script Actors.
+/// Subscribes to attribute change notifications from Instance Actor.
+///
+/// Evaluates alarm conditions:
+/// - ValueMatch: attribute equals a specific value
+/// - RangeViolation: attribute outside min/max range
+/// - RateOfChange: attribute rate exceeds threshold (configurable window, default per-second)
+///
+/// State (active/normal) is in memory only, NOT persisted.
+/// On restart: starts normal, re-evaluates from incoming values.
+///
+/// WP-21: AlarmExecutionActor CAN call Instance.CallScript() (ask to sibling Script Actor).
+/// Instance scripts CANNOT call alarm on-trigger scripts (no Instance.CallAlarmScript API).
+///
+/// Supervision: Resume on exception; AlarmExecutionActor stopped on exception.
+///
+public class AlarmActor : ReceiveActor
+{
+ private readonly string _alarmName;
+ private readonly string _instanceName;
+ private readonly IActorRef _instanceActor;
+ private readonly SharedScriptLibrary _sharedScriptLibrary;
+ private readonly SiteRuntimeOptions _options;
+ private readonly ILogger _logger;
+
+ private AlarmState _currentState = AlarmState.Normal;
+ private readonly AlarmTriggerType _triggerType;
+ private readonly AlarmEvalConfig _evalConfig;
+ private readonly int _priority;
+ private readonly string? _onTriggerScriptName;
+ private readonly Script