diff --git a/CLAUDE.md b/CLAUDE.md
index 7198163..7c6fa53 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -41,7 +41,7 @@ This project contains design documentation for a distributed SCADA system built
2. Deployment Manager — Central-side deployment pipeline, system-wide artifact deployment, instance lifecycle.
3. Site Runtime — Site-side actor hierarchy (Deployment Manager singleton, Instance/Script/Alarm Actors), script compilation, Akka stream.
4. Data Connection Layer — Protocol abstraction (OPC UA, custom), subscription management, clean data pipe.
-5. Central–Site Communication — Akka.NET remoting, message patterns, debug streaming.
+5. Central–Site Communication — Akka.NET ClusterClient/ClusterClientReceptionist, message patterns, debug streaming.
6. Store-and-Forward Engine — Buffering, fixed-interval retry, parking, SQLite persistence, replication.
7. External System Gateway — External system definitions, API method invocation, database connections.
8. Notification Service — Notification lists, email delivery, store-and-forward integration.
@@ -78,7 +78,7 @@ This project contains design documentation for a distributed SCADA system built
- Tag path resolution retried periodically for devices still booting.
- Static attribute writes persisted to local SQLite (survive restart/failover, reset on redeployment).
- All timestamps are UTC throughout the system.
-- Site addressing is database-driven: NodeAAddress and NodeBAddress stored in the Sites table, cached in CentralCommunicationActor, refreshed periodically (60s) and on admin changes. Heartbeats serve health monitoring only.
+- Inter-cluster communication uses ClusterClient/ClusterClientReceptionist. Both CentralCommunicationActor and SiteCommunicationActor registered with receptionist. Central creates one ClusterClient per site using NodeA/NodeB as contact points. Sites configure multiple central contact points for failover. Addresses cached in CentralCommunicationActor, refreshed periodically (60s) and on admin changes. Heartbeats serve health monitoring only.
### External Integrations
- External System Gateway: HTTP/REST only, JSON serialization, API key + Basic Auth.
@@ -141,6 +141,7 @@ This project contains design documentation for a distributed SCADA system built
### Akka.NET Conventions
- Tell for hot-path internal communication; Ask reserved for system boundaries.
+- ClusterClient for cross-cluster communication; ClusterClientReceptionist for service discovery across cluster boundaries.
- Script trust model: forbidden APIs (System.IO, Process, Threading, Reflection, raw network).
- Application-level correlation IDs on all request/response messages.
diff --git a/Component-Communication.md b/Component-Communication.md
index 33e9a59..3a4e33d 100644
--- a/Component-Communication.md
+++ b/Component-Communication.md
@@ -11,7 +11,7 @@ Both central and site clusters. Each side has communication actors that handle m
## Responsibilities
- Resolve site addresses from the configuration database and maintain a cached address map.
-- Establish and maintain Akka.NET remoting connections between central and each site cluster.
+- Establish and maintain cross-cluster connections using Akka.NET ClusterClient/ClusterClientReceptionist.
- Route messages between central and site clusters in a hub-and-spoke topology.
- Broker requests from external systems (via central) to sites and return responses.
- Support multiple concurrent message patterns (request/response, fire-and-forget, streaming).
@@ -75,25 +75,35 @@ Both central and site clusters. Each side has communication actors that handle m
```
Central Cluster
- ├── Akka.NET Remoting → Site A Cluster
- ├── Akka.NET Remoting → Site B Cluster
- └── Akka.NET Remoting → Site N Cluster
+ ├── ClusterClient → Site A Cluster (SiteCommunicationActor via Receptionist)
+ ├── ClusterClient → Site B Cluster (SiteCommunicationActor via Receptionist)
+ └── ClusterClient → Site N Cluster (SiteCommunicationActor via Receptionist)
+
+Site Clusters
+ └── ClusterClient → Central Cluster (CentralCommunicationActor via Receptionist)
```
- Sites do **not** communicate with each other.
- All inter-cluster communication flows through central.
+- Both **CentralCommunicationActor** and **SiteCommunicationActor** are registered with their cluster's **ClusterClientReceptionist** for cross-cluster discovery.
## Site Address Resolution
Central discovers site addresses through the **configuration database**, not runtime registration:
-- Each site record in the Sites table includes optional **NodeAAddress** and **NodeBAddress** fields containing the Akka remoting paths of the site's cluster nodes.
-- The **CentralCommunicationActor** loads all site addresses from the database at startup and caches them in memory.
-- The cache is **refreshed every 60 seconds** and **on-demand** when site records are added, edited, or deleted via the Central UI or CLI.
-- When routing a message to a site, the actor **prefers NodeA** and **falls back to NodeB** if NodeA is unreachable.
+- Each site record in the Sites table includes optional **NodeAAddress** and **NodeBAddress** fields containing base Akka addresses of the site's cluster nodes (e.g., `akka.tcp://scadalink@host:port`).
+- The **CentralCommunicationActor** loads all site addresses from the database at startup and creates one **ClusterClient per site**, configured with both NodeA and NodeB as contact points.
+- The address cache is **refreshed every 60 seconds** and **on-demand** when site records are added, edited, or deleted via the Central UI or CLI. ClusterClient instances are recreated when contact points change.
+- When routing a message to a site, central sends via `ClusterClient.Send("/user/site-communication", msg)`. **ClusterClient handles failover between NodeA and NodeB internally** — there is no application-level NodeA preference/NodeB fallback logic.
- **Heartbeats** from sites serve **health monitoring only** — they do not serve as a registration or address discovery mechanism.
- If no addresses are configured for a site, messages to that site are **dropped** and the caller's Ask times out.
+### Site → Central Communication
+
+- Site nodes configure a list of **CentralContactPoints** (both central node addresses) instead of a single `CentralActorPath`.
+- The site creates a **ClusterClient** using the central contact points and sends heartbeats, health reports, and other messages via `ClusterClient.Send("/user/central-communication", msg)`.
+- ClusterClient handles automatic failover between central nodes — if the active central node goes down, the site's ClusterClient reconnects to the standby node transparently.
+
## Message Timeouts
Each request/response pattern has a default timeout that can be overridden in configuration:
@@ -111,11 +121,11 @@ Timeouts use the Akka.NET **ask pattern**. If no response is received within the
## Transport Configuration
-Akka.NET remoting provides built-in connection management and failure detection. The following transport-level settings are **explicitly configured** (not left to framework defaults) for predictable behavior:
+Akka.NET remoting provides the underlying transport for both intra-cluster communication and ClusterClient connections. The following transport-level settings are **explicitly configured** (not left to framework defaults) for predictable behavior:
- **Transport heartbeat interval**: Configurable interval at which heartbeat messages are sent over remoting connections (e.g., every 5 seconds).
- **Failure detection threshold**: Number of missed heartbeats before the connection is considered lost (e.g., 3 missed heartbeats = 15 seconds with a 5-second interval).
-- **Reconnection**: Akka.NET remoting handles reconnection automatically. No custom reconnection logic is required.
+- **Reconnection**: ClusterClient handles reconnection and failover between contact points automatically for cross-cluster communication. No custom reconnection logic is required.
These settings should be tuned for the expected network conditions between central and site clusters.
@@ -135,7 +145,7 @@ Akka.NET guarantees message ordering between a specific sender/receiver actor pa
## ManagementActor and ClusterClient
-The ManagementActor is registered at the well-known path `/user/management` on central nodes and advertised via **ClusterClientReceptionist**. External tools (primarily the CLI) connect using Akka.NET ClusterClient, which contacts the receptionist to discover the ManagementActor. ClusterClient is a separate communication channel from the inter-cluster remoting used for central-site messaging — it does not participate in cluster membership or affect the hub-and-spoke topology.
+The ManagementActor is registered at the well-known path `/user/management` on central nodes and advertised via **ClusterClientReceptionist**. External tools (primarily the CLI) connect using Akka.NET ClusterClient, which contacts the receptionist to discover the ManagementActor. This is a separate ClusterClient usage from the inter-cluster ClusterClient connections used for central-site messaging — the CLI does not participate in cluster membership or affect the hub-and-spoke topology.
## Connection Failure Behavior
@@ -144,12 +154,12 @@ The ManagementActor is registered at the well-known path `/user/management` on c
## Failover Behavior
-- **Central failover**: The standby node takes over the Akka.NET cluster role. In-progress deployments are treated as failed. Sites reconnect to the new active central node.
-- **Site failover**: The standby node takes over. The Deployment Manager singleton restarts and re-creates the Instance Actor hierarchy. Central detects the node change and reconnects. Ongoing debug streams are interrupted and must be re-established by the engineer.
+- **Central failover**: The standby node takes over the Akka.NET cluster role. In-progress deployments are treated as failed. Site ClusterClients automatically reconnect to the standby central node via their configured contact points.
+- **Site failover**: The standby node takes over. The Deployment Manager singleton restarts and re-creates the Instance Actor hierarchy. Central's per-site ClusterClient automatically reconnects to the surviving site node. Ongoing debug streams are interrupted and must be re-established by the engineer.
## Dependencies
-- **Akka.NET Remoting**: Provides the transport layer.
+- **Akka.NET Remoting + ClusterClient**: Provides the transport layer. ClusterClient/ClusterClientReceptionist used for all cross-cluster messaging.
- **Cluster Infrastructure**: Manages node roles and failover detection.
- **Configuration Database**: Provides site node addresses (NodeAAddress, NodeBAddress) for address resolution.
diff --git a/HighLevelReqs.md b/HighLevelReqs.md
index a3ea5df..4c43eb4 100644
--- a/HighLevelReqs.md
+++ b/HighLevelReqs.md
@@ -45,8 +45,9 @@
- **Machine Data Database**: A separate database for collected machine data (e.g., telemetry, measurements, events).
### 2.2 Communication: Central ↔ Site
-- Central-to-site and site-to-central communication uses **Akka.NET** (remoting/cluster).
-- **Site addressing**: Site Akka remoting addresses (NodeA and NodeB) are stored in the **Sites database table** and configured via the Central UI. Central resolves site addresses from the database (cached in memory, refreshed periodically and on admin changes) rather than relying on runtime registration messages from sites.
+- Central-to-site and site-to-central communication uses **Akka.NET ClusterClient/ClusterClientReceptionist** for cross-cluster messaging with automatic failover.
+- **Site addressing**: Site Akka base addresses (NodeA and NodeB) are stored in the **Sites database table** and configured via the Central UI. Central creates a ClusterClient per site using both addresses as contact points (cached in memory, refreshed periodically and on admin changes) rather than relying on runtime registration messages from sites.
+- **Central contact points**: Sites configure **multiple central contact points** (both central node addresses) for redundancy. ClusterClient handles failover between central nodes automatically.
- **Central as integration hub**: Central brokers requests between external systems and sites. For example, a recipe manager sends a recipe to central, which routes it to the appropriate site. MES requests machine values from central, which routes the request to the site and returns the response.
- **Real-time data streaming** is not continuous for all machine data. The only real-time stream is an **on-demand debug view** — an engineer in the central UI can open a live view of a specific instance's tag values and alarm states for troubleshooting purposes. This is session-based and temporary. The debug view subscribes to the site-wide Akka stream filtered by instance (see Section 8.1).
diff --git a/docker/site-a-node-a/appsettings.Site.json b/docker/site-a-node-a/appsettings.Site.json
index 234fbc4..a24db2c 100644
--- a/docker/site-a-node-a/appsettings.Site.json
+++ b/docker/site-a-node-a/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/docker/site-a-node-b/appsettings.Site.json b/docker/site-a-node-b/appsettings.Site.json
index d7319e6..45d83fc 100644
--- a/docker/site-a-node-b/appsettings.Site.json
+++ b/docker/site-a-node-b/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/docker/site-b-node-a/appsettings.Site.json b/docker/site-b-node-a/appsettings.Site.json
index d885a4c..bd7e168 100644
--- a/docker/site-b-node-a/appsettings.Site.json
+++ b/docker/site-b-node-a/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/docker/site-b-node-b/appsettings.Site.json b/docker/site-b-node-b/appsettings.Site.json
index fc76f8d..8cdeb36 100644
--- a/docker/site-b-node-b/appsettings.Site.json
+++ b/docker/site-b-node-b/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/docker/site-c-node-a/appsettings.Site.json b/docker/site-c-node-a/appsettings.Site.json
index 6692f73..50c8dce 100644
--- a/docker/site-c-node-a/appsettings.Site.json
+++ b/docker/site-c-node-a/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/docker/site-c-node-b/appsettings.Site.json b/docker/site-c-node-b/appsettings.Site.json
index c8ecd2b..686d988 100644
--- a/docker/site-c-node-b/appsettings.Site.json
+++ b/docker/site-c-node-b/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@scadalink-central-a:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@scadalink-central-a:8081",
+ "akka.tcp://scadalink@scadalink-central-b:8081"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
index 883f0bc..f31bde5 100644
--- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
+++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
@@ -1,4 +1,6 @@
+using System.Collections.Immutable;
using Akka.Actor;
+using Akka.Cluster.Tools.Client;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
@@ -9,8 +11,29 @@ using ScadaLink.HealthMonitoring;
namespace ScadaLink.Communication.Actors;
///
-/// Central-side actor that routes messages from central to site clusters via Akka remoting.
-/// Resolves site addresses from the database on a periodic refresh cycle.
+/// Abstraction for creating ClusterClient instances per site, enabling testability.
+///
+public interface ISiteClientFactory
+{
+ IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts);
+}
+
+///
+/// Default implementation that creates a real ClusterClient for each site.
+///
+public class DefaultSiteClientFactory : ISiteClientFactory
+{
+ public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts)
+ {
+ var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts);
+ return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}");
+ }
+}
+
+///
+/// Central-side actor that routes messages from central to site clusters via ClusterClient.
+/// Resolves site addresses from the database on a periodic refresh cycle and manages
+/// per-site ClusterClient instances.
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
@@ -19,13 +42,14 @@ public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IServiceProvider _serviceProvider;
+ private readonly ISiteClientFactory _siteClientFactory;
///
- /// Cached site address entries loaded from the database.
- /// Maps SiteIdentifier → (NodeA selection, NodeB selection).
+ /// Per-site ClusterClient instances and their contact addresses.
+ /// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings).
/// Refreshed periodically via RefreshSiteAddresses.
///
- private Dictionary _siteAddressCache = new();
+ private Dictionary ContactAddresses)> _siteClients = new();
///
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
@@ -41,9 +65,10 @@ public class CentralCommunicationActor : ReceiveActor
private ICancelable? _refreshSchedule;
- public CentralCommunicationActor(IServiceProvider serviceProvider)
+ public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
{
_serviceProvider = serviceProvider;
+ _siteClientFactory = siteClientFactory;
// Site address cache loaded from database
Receive(HandleSiteAddressCacheLoaded);
@@ -112,7 +137,7 @@ public class CentralCommunicationActor : ReceiveActor
_inProgressDeployments.Remove(deploymentId);
}
- // Note: Do NOT remove from _siteAddressCache — addresses are persistent in the database
+ // Note: Do NOT stop the ClusterClient — it handles reconnection internally
}
else
{
@@ -122,30 +147,22 @@ public class CentralCommunicationActor : ReceiveActor
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
- if (!_siteAddressCache.TryGetValue(envelope.SiteId, out var entry))
+ if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
- _log.Warning("No known address for site {0}, cannot route message {1}",
+ _log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
- // Prefer NodeA, fall back to NodeB
- var selection = entry.NodeA ?? entry.NodeB;
- if (selection == null)
- {
- _log.Warning("Site {0} has no configured node addresses, cannot route message {1}",
- envelope.SiteId, envelope.Message.GetType().Name);
- return;
- }
-
// Track debug subscriptions for cleanup on disconnect
TrackMessageForCleanup(envelope);
- // Forward the inner message to the site, preserving the original sender
- // so the site can reply directly to the caller (completing the Ask pattern)
- selection.Tell(envelope.Message, Sender);
+ // Route via ClusterClient — Sender is preserved for Ask response routing
+ entry.Client.Tell(
+ new ClusterClient.Send("/user/site-communication", envelope.Message),
+ Sender);
}
private void LoadSiteAddressesFromDb()
@@ -157,34 +174,72 @@ public class CentralCommunicationActor : ReceiveActor
var repo = scope.ServiceProvider.GetRequiredService();
var sites = await repo.GetAllSitesAsync();
- var cache = new Dictionary();
+ var contacts = new Dictionary>();
foreach (var site in sites)
{
- if (string.IsNullOrWhiteSpace(site.NodeAAddress) && string.IsNullOrWhiteSpace(site.NodeBAddress))
- continue;
-
- cache[site.SiteIdentifier] = (site.NodeAAddress, site.NodeBAddress);
+ var addrs = new List();
+ if (!string.IsNullOrWhiteSpace(site.NodeAAddress))
+ {
+ var addr = site.NodeAAddress;
+ // Strip actor path suffix if present (legacy format)
+ var idx = addr.IndexOf("/user/");
+ if (idx > 0) addr = addr.Substring(0, idx);
+ addrs.Add(addr);
+ }
+ if (!string.IsNullOrWhiteSpace(site.NodeBAddress))
+ {
+ var addr = site.NodeBAddress;
+ var idx = addr.IndexOf("/user/");
+ if (idx > 0) addr = addr.Substring(0, idx);
+ addrs.Add(addr);
+ }
+ if (addrs.Count > 0)
+ contacts[site.SiteIdentifier] = addrs;
}
- return new SiteAddressCacheLoaded(cache);
+ return new SiteAddressCacheLoaded(contacts);
}).PipeTo(self);
}
private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg)
{
- var newCache = new Dictionary();
- foreach (var (siteId, (nodeAAddr, nodeBAddr)) in msg.Addresses)
+ var newSiteIds = msg.SiteContacts.Keys.ToHashSet();
+ var existingSiteIds = _siteClients.Keys.ToHashSet();
+
+ // Stop ClusterClients for removed sites
+ foreach (var removed in existingSiteIds.Except(newSiteIds))
{
- var nodeA = !string.IsNullOrWhiteSpace(nodeAAddr)
- ? Context.ActorSelection(nodeAAddr)
- : null;
- var nodeB = !string.IsNullOrWhiteSpace(nodeBAddr)
- ? Context.ActorSelection(nodeBAddr)
- : null;
- newCache[siteId] = (nodeA, nodeB);
+ _log.Info("Stopping ClusterClient for removed site {0}", removed);
+ Context.Stop(_siteClients[removed].Client);
+ _siteClients.Remove(removed);
}
- _siteAddressCache = newCache;
- _log.Info("Site address cache refreshed with {0} site(s)", _siteAddressCache.Count);
+
+ // Add or update
+ foreach (var (siteId, addresses) in msg.SiteContacts)
+ {
+ var contactPaths = addresses
+ .Select(a => ActorPath.Parse($"{a}/system/receptionist"))
+ .ToImmutableHashSet();
+
+ var contactStrings = addresses.ToImmutableHashSet();
+
+ // Skip if unchanged
+ if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings))
+ continue;
+
+ // Stop old client if addresses changed
+ if (_siteClients.ContainsKey(siteId))
+ {
+ _log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId);
+ Context.Stop(_siteClients[siteId].Client);
+ }
+
+ var client = _siteClientFactory.Create(Context.System, siteId, contactPaths);
+ _siteClients[siteId] = (client, contactStrings);
+ _log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count);
+ }
+
+ _log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count);
}
private void TrackMessageForCleanup(SiteEnvelope envelope)
@@ -234,10 +289,10 @@ public class CentralCommunicationActor : ReceiveActor
public record RefreshSiteAddresses;
///
-/// Internal message carrying the loaded site address data from the database.
-/// ActorSelection creation happens on the actor thread in HandleSiteAddressCacheLoaded.
+/// Internal message carrying the loaded site contact data from the database.
+/// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded.
///
-internal record SiteAddressCacheLoaded(Dictionary Addresses);
+internal record SiteAddressCacheLoaded(Dictionary> SiteContacts);
///
/// Notification sent to debug view subscribers when the stream is terminated
diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs
index be71438..0b3512b 100644
--- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs
+++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs
@@ -1,4 +1,5 @@
using Akka.Actor;
+using Akka.Cluster.Tools.Client;
using Akka.Event;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.DebugView;
@@ -11,9 +12,9 @@ using ScadaLink.Commons.Messages.RemoteQuery;
namespace ScadaLink.Communication.Actors;
///
-/// Site-side actor that receives messages from central via Akka remoting and routes
+/// Site-side actor that receives messages from central via ClusterClient and routes
/// them to the appropriate local actors. Also sends heartbeats and health reports
-/// to central.
+/// to central via the registered ClusterClient.
///
/// WP-4: Routes all 8 message patterns to local handlers.
///
@@ -29,10 +30,10 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
private readonly IActorRef _deploymentManagerProxy;
///
- /// Optional reference to the central communication actor for sending heartbeats/health.
- /// Set via RegisterCentral message.
+ /// ClusterClient reference for sending messages to the central cluster.
+ /// Set via RegisterCentralClient message.
///
- private ActorSelection? _centralSelection;
+ private IActorRef? _centralClient;
///
/// Local actor references for routing specific message patterns.
@@ -55,7 +56,11 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
_deploymentManagerProxy = deploymentManagerProxy;
// Registration
- Receive(HandleRegisterCentral);
+ Receive(msg =>
+ {
+ _centralClient = msg.Client;
+ _log.Info("Registered central ClusterClient");
+ });
Receive(HandleRegisterLocalHandler);
// Pattern 1: Instance Deployment — forward to Deployment Manager
@@ -130,7 +135,8 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
// Internal: forward health report to central
Receive(msg =>
{
- _centralSelection?.Tell(msg, Self);
+ _centralClient?.Tell(
+ new ClusterClient.Send("/user/central-communication", msg), Self);
});
}
@@ -146,12 +152,6 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
_options.TransportHeartbeatInterval);
}
- private void HandleRegisterCentral(RegisterCentralPath msg)
- {
- _centralSelection = Context.ActorSelection(msg.CentralActorPath);
- _log.Info("Registered central communication path: {0}", msg.CentralActorPath);
- }
-
private void HandleRegisterLocalHandler(RegisterLocalHandler msg)
{
switch (msg.HandlerType)
@@ -175,7 +175,7 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
private void SendHeartbeatToCentral()
{
- if (_centralSelection == null)
+ if (_centralClient == null)
return;
var hostname = Environment.MachineName;
@@ -185,7 +185,8 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
IsActive: true,
DateTimeOffset.UtcNow);
- _centralSelection.Tell(heartbeat, Self);
+ _centralClient.Tell(
+ new ClusterClient.Send("/user/central-communication", heartbeat), Self);
}
// ── Internal messages ──
@@ -194,9 +195,9 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
}
///
-/// Command to register the central communication actor path for outbound messages.
+/// Command to register a ClusterClient for communicating with the central cluster.
///
-public record RegisterCentralPath(string CentralActorPath);
+public record RegisterCentralClient(IActorRef Client);
///
/// Command to register a local actor as a handler for a specific message pattern.
diff --git a/src/ScadaLink.Communication/CommunicationOptions.cs b/src/ScadaLink.Communication/CommunicationOptions.cs
index 1966565..1542a99 100644
--- a/src/ScadaLink.Communication/CommunicationOptions.cs
+++ b/src/ScadaLink.Communication/CommunicationOptions.cs
@@ -28,10 +28,10 @@ public class CommunicationOptions
public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
///
- /// Remote actor path for the central communication actor. Used by site nodes to
- /// register with central on startup (e.g. "akka.tcp://scadalink@central:8081/user/central-communication").
+ /// Contact point addresses for the central cluster (e.g. "akka.tcp://scadalink@central-a:8081").
+ /// Used by site nodes to create a ClusterClient for reaching central.
///
- public string? CentralActorPath { get; set; }
+ public List CentralContactPoints { get; set; } = new();
/// Akka.Remote transport heartbeat interval.
public TimeSpan TransportHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);
diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
index 8bf6bf6..4a20a91 100644
--- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs
+++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
@@ -1,3 +1,4 @@
+using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.Client;
@@ -168,10 +169,15 @@ akka {{
///
private void RegisterCentralActors()
{
+ var siteClientFactory = new DefaultSiteClientFactory();
var centralCommActor = _actorSystem!.ActorOf(
- Props.Create(() => new CentralCommunicationActor(_serviceProvider)),
+ Props.Create(() => new CentralCommunicationActor(_serviceProvider, siteClientFactory)),
"central-communication");
+ // Register CentralCommunicationActor with ClusterClientReceptionist so site ClusterClients can reach it
+ ClusterClientReceptionist.Get(_actorSystem).RegisterService(centralCommActor);
+ _logger.LogInformation("CentralCommunicationActor registered with ClusterClientReceptionist");
+
// Wire up the CommunicationService with the actor reference
var commService = _serviceProvider.GetService();
commService?.SetCommunicationActor(centralCommActor);
@@ -247,26 +253,37 @@ akka {{
var dmProxy = _actorSystem.ActorOf(proxyProps, "deployment-manager-proxy");
// WP-4: Create SiteCommunicationActor for receiving messages from central
- _actorSystem.ActorOf(
+ var siteCommActor = _actorSystem.ActorOf(
Props.Create(() => new SiteCommunicationActor(
_nodeOptions.SiteId!,
_communicationOptions,
dmProxy)),
"site-communication");
+ // Register SiteCommunicationActor with ClusterClientReceptionist so central ClusterClients can reach it
+ ClusterClientReceptionist.Get(_actorSystem).RegisterService(siteCommActor);
+
_logger.LogInformation(
"Site actors registered. DeploymentManager singleton scoped to role={SiteRole}, SiteCommunicationActor created.",
siteRole);
- // Register with Central if configured — tells Central where to send deployment commands
- if (!string.IsNullOrWhiteSpace(_communicationOptions.CentralActorPath))
+ // Create ClusterClient to central if contact points are configured
+ if (_communicationOptions.CentralContactPoints.Count > 0)
{
- var siteCommActor = _actorSystem.ActorSelection("/user/site-communication");
- siteCommActor.Tell(new RegisterCentralPath(_communicationOptions.CentralActorPath));
+ var contacts = _communicationOptions.CentralContactPoints
+ .Select(cp => ActorPath.Parse($"{cp}/system/receptionist"))
+ .ToImmutableHashSet();
+ var clientSettings = ClusterClientSettings.Create(_actorSystem)
+ .WithInitialContacts(contacts);
+ var centralClient = _actorSystem.ActorOf(
+ ClusterClient.Props(clientSettings), "central-cluster-client");
+
+ var siteCommSelection = _actorSystem.ActorSelection("/user/site-communication");
+ siteCommSelection.Tell(new RegisterCentralClient(centralClient));
_logger.LogInformation(
- "Configured central heartbeat path at {CentralPath} for site {SiteId}",
- _communicationOptions.CentralActorPath, _nodeOptions.SiteId);
+ "Created ClusterClient to central with {Count} contact point(s) for site {SiteId}",
+ contacts.Count, _nodeOptions.SiteId);
}
}
}
diff --git a/src/ScadaLink.Host/appsettings.Site.json b/src/ScadaLink.Host/appsettings.Site.json
index fc68384..2a8948f 100644
--- a/src/ScadaLink.Host/appsettings.Site.json
+++ b/src/ScadaLink.Host/appsettings.Site.json
@@ -30,7 +30,10 @@
"ReplicationEnabled": true
},
"Communication": {
- "CentralActorPath": "akka.tcp://scadalink@localhost:8081/user/central-communication",
+ "CentralContactPoints": [
+ "akka.tcp://scadalink@localhost:8081",
+ "akka.tcp://scadalink@localhost:8082"
+ ],
"DeploymentTimeout": "00:02:00",
"LifecycleTimeout": "00:00:30",
"QueryTimeout": "00:00:30",
diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs
index 9db4851..010ee3b 100644
--- a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs
+++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorTests.cs
@@ -1,4 +1,6 @@
+using System.Collections.Immutable;
using Akka.Actor;
+using Akka.Cluster.Tools.Client;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
@@ -9,19 +11,21 @@ using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Communication.Actors;
+using Akka.TestKit;
namespace ScadaLink.Communication.Tests;
///
-/// Tests for CentralCommunicationActor with database-driven site addressing.
-/// WP-4: Message routing via site address cache loaded from DB.
+/// Tests for CentralCommunicationActor with per-site ClusterClient routing.
+/// WP-4: Message routing via ClusterClient instances created per site.
/// WP-5: Connection failure and failover handling.
///
public class CentralCommunicationActorTests : TestKit
{
public CentralCommunicationActorTests() : base(@"akka.loglevel = DEBUG") { }
- private (IActorRef actor, ISiteRepository mockRepo) CreateActorWithMockRepo(IEnumerable? sites = null)
+ private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes) CreateActorWithMockRepo(
+ IEnumerable? sites = null)
{
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
@@ -31,37 +35,76 @@ public class CentralCommunicationActorTests : TestKit
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
- var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp)));
- return (actor, mockRepo);
+ var siteProbes = new Dictionary();
+ var mockFactory = Substitute.For();
+ mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>())
+ .Returns(callInfo =>
+ {
+ var siteId = callInfo.ArgAt(1);
+ var probe = CreateTestProbe();
+ siteProbes[siteId] = probe;
+ return probe.Ref;
+ });
+
+ var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
+ return (actor, mockRepo, siteProbes);
}
- private Site CreateSite(string identifier, string? nodeAPath, string? nodeBPath = null) =>
- new("Test Site", identifier) { NodeAAddress = nodeAPath, NodeBAddress = nodeBPath };
+ private (IActorRef actor, ISiteRepository mockRepo, Dictionary siteProbes, ISiteClientFactory mockFactory) CreateActorWithFactory(
+ IEnumerable? sites = null)
+ {
+ var mockRepo = Substitute.For();
+ mockRepo.GetAllSitesAsync(Arg.Any())
+ .Returns(sites?.ToList() ?? new List());
+
+ var services = new ServiceCollection();
+ services.AddScoped(_ => mockRepo);
+ var sp = services.BuildServiceProvider();
+
+ var siteProbes = new Dictionary();
+ var mockFactory = Substitute.For();
+ mockFactory.Create(Arg.Any(), Arg.Any(), Arg.Any>())
+ .Returns(callInfo =>
+ {
+ var siteId = callInfo.ArgAt(1);
+ var probe = CreateTestProbe();
+ siteProbes[siteId] = probe;
+ return probe.Ref;
+ });
+
+ var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
+ return (actor, mockRepo, siteProbes, mockFactory);
+ }
+
+ private Site CreateSite(string identifier, string? nodeAAddress, string? nodeBAddress = null) =>
+ new("Test Site", identifier) { NodeAAddress = nodeAAddress, NodeBAddress = nodeBAddress };
[Fact]
- public void DatabaseDrivenRouting_RoutesToConfiguredSite()
+ public void ClusterClientRouting_RoutesToConfiguredSite()
{
- var probe = CreateTestProbe();
- var site = CreateSite("site1", probe.Ref.Path.ToString());
- var (actor, _) = CreateActorWithMockRepo(new[] { site });
+ var site = CreateSite("site1", "akka.tcp://scadalink@host:8082");
+ var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site });
- // Send explicit refresh and wait for async DB load + PipeTo
- actor.Tell(new RefreshSiteAddresses());
+ // Wait for auto-refresh (PreStart schedules with TimeSpan.Zero initial delay)
Thread.Sleep(1000);
var command = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site1", command));
- probe.ExpectMsg(msg => msg.DeploymentId == "dep1");
+ // The site1 probe (acting as ClusterClient) should receive a ClusterClient.Send
+ var msg = siteProbes["site1"].ExpectMsg();
+ Assert.Equal("/user/site-communication", msg.Path);
+ Assert.IsType(msg.Message);
+ Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
}
[Fact]
public void UnconfiguredSite_MessageIsDropped()
{
- var (actor, _) = CreateActorWithMockRepo();
+ var (actor, _, _) = CreateActorWithMockRepo();
- actor.Tell(new RefreshSiteAddresses());
+ // Wait for auto-refresh
Thread.Sleep(1000);
var command = new DeployInstanceCommand(
@@ -74,11 +117,10 @@ public class CentralCommunicationActorTests : TestKit
[Fact]
public void ConnectionLost_DebugStreamsKilled()
{
- var siteProbe = CreateTestProbe();
- var site = CreateSite("site1", siteProbe.Ref.Path.ToString());
- var (actor, _) = CreateActorWithMockRepo(new[] { site });
+ var site = CreateSite("site1", "akka.tcp://scadalink@host:8082");
+ var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site });
- actor.Tell(new RefreshSiteAddresses());
+ // Wait for auto-refresh
Thread.Sleep(1000);
// Subscribe to debug view (tracks the subscription)
@@ -86,6 +128,9 @@ public class CentralCommunicationActorTests : TestKit
var subRequest = new SubscribeDebugViewRequest("inst1", "corr-123");
actor.Tell(new SiteEnvelope("site1", subRequest), subscriberProbe.Ref);
+ // The ClusterClient probe receives the routed message
+ siteProbes["site1"].ExpectMsg();
+
// Simulate site disconnection
actor.Tell(new ConnectionStateChanged("site1", false, DateTimeOffset.UtcNow));
@@ -97,7 +142,6 @@ public class CentralCommunicationActorTests : TestKit
[Fact]
public void Heartbeat_ForwardedToParent()
{
- // Actor still needs IServiceProvider even though this test doesn't use routing
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(new List());
@@ -106,9 +150,10 @@ public class CentralCommunicationActorTests : TestKit
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
+ var siteClientFactory = Substitute.For();
var parentProbe = CreateTestProbe();
var centralActor = parentProbe.ChildActorOf(
- Props.Create(() => new CentralCommunicationActor(sp)));
+ Props.Create(() => new CentralCommunicationActor(sp, siteClientFactory)));
var heartbeat = new HeartbeatMessage("site1", "host1", true, DateTimeOffset.UtcNow);
centralActor.Tell(heartbeat);
@@ -119,24 +164,21 @@ public class CentralCommunicationActorTests : TestKit
[Fact]
public void RefreshSiteAddresses_UpdatesCache()
{
- var probe1 = CreateTestProbe();
- var probe2 = CreateTestProbe();
+ var site1 = CreateSite("site1", "akka.tcp://scadalink@host1:8082");
+ var (actor, mockRepo, siteProbes) = CreateActorWithMockRepo(new[] { site1 });
- var site1 = CreateSite("site1", probe1.Ref.Path.ToString());
- var (actor, mockRepo) = CreateActorWithMockRepo(new[] { site1 });
-
- // Wait for initial load, then send explicit refresh
- actor.Tell(new RefreshSiteAddresses());
+ // Wait for initial load
Thread.Sleep(1000);
// Verify routing to site1 works
var cmd1 = new DeployInstanceCommand(
"dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site1", cmd1));
- probe1.ExpectMsg(msg => msg.DeploymentId == "dep1");
+ var msg1 = siteProbes["site1"].ExpectMsg();
+ Assert.Equal("dep1", ((DeployInstanceCommand)msg1.Message).DeploymentId);
// Update mock repo to return both sites
- var site2 = CreateSite("site2", probe2.Ref.Path.ToString());
+ var site2 = CreateSite("site2", "akka.tcp://scadalink@host2:8082");
mockRepo.GetAllSitesAsync(Arg.Any())
.Returns(new List { site1, site2 });
@@ -148,23 +190,26 @@ public class CentralCommunicationActorTests : TestKit
var cmd2 = new DeployInstanceCommand(
"dep2", "inst2", "hash2", "{}", "admin", DateTimeOffset.UtcNow);
actor.Tell(new SiteEnvelope("site2", cmd2));
- probe2.ExpectMsg(msg => msg.DeploymentId == "dep2");
+ var msg2 = siteProbes["site2"].ExpectMsg();
+ Assert.Equal("dep2", ((DeployInstanceCommand)msg2.Message).DeploymentId);
}
[Fact]
- public void NodeBFallback_WhenNodeANotConfigured()
+ public void BothContactPoints_UsedInSingleClient()
{
- var probe = CreateTestProbe();
- var site = CreateSite("site1", null, probe.Ref.Path.ToString());
- var (actor, _) = CreateActorWithMockRepo(new[] { site });
+ var site = CreateSite("site1",
+ "akka.tcp://scadalink@host1:8082",
+ "akka.tcp://scadalink@host2:8082");
- actor.Tell(new RefreshSiteAddresses());
+ var (actor, _, siteProbes, mockFactory) = CreateActorWithFactory(new[] { site });
+
+ // Wait for auto-refresh
Thread.Sleep(1000);
- var command = new DeployInstanceCommand(
- "dep1", "inst1", "hash1", "{}", "admin", DateTimeOffset.UtcNow);
- actor.Tell(new SiteEnvelope("site1", command));
-
- probe.ExpectMsg(msg => msg.DeploymentId == "dep1");
+ // Verify the factory was called with 2 contact paths
+ mockFactory.Received(1).Create(
+ Arg.Any(),
+ Arg.Is("site1"),
+ Arg.Is>(paths => paths.Count == 2));
}
}