feat: replace ActorSelection with ClusterClient for inter-cluster communication

Central and site clusters now communicate via ClusterClient/
ClusterClientReceptionist instead of direct ActorSelection. Both
CentralCommunicationActor and SiteCommunicationActor are registered
with their cluster's receptionist. Central creates one ClusterClient
per site using NodeA/NodeB contact points from the DB. Sites configure
multiple CentralContactPoints for automatic failover between central
nodes. ISiteClientFactory enables test injection.
This commit is contained in:
Joseph Doherty
2026-03-18 00:08:47 -04:00
parent e5eb871961
commit 4f22ca2b1f
15 changed files with 287 additions and 136 deletions

View File

@@ -1,4 +1,6 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
@@ -9,8 +11,29 @@ using ScadaLink.HealthMonitoring;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Central-side actor that routes messages from central to site clusters via Akka remoting.
/// Resolves site addresses from the database on a periodic refresh cycle.
/// Abstraction for creating ClusterClient instances per site, enabling testability.
/// </summary>
public interface ISiteClientFactory
{
IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts);
}
/// <summary>
/// Default implementation that creates a real ClusterClient for each site.
/// </summary>
public class DefaultSiteClientFactory : ISiteClientFactory
{
public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts)
{
var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts);
return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}");
}
}
/// <summary>
/// Central-side actor that routes messages from central to site clusters via ClusterClient.
/// Resolves site addresses from the database on a periodic refresh cycle and manages
/// per-site ClusterClient instances.
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
@@ -19,13 +42,14 @@ public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IServiceProvider _serviceProvider;
private readonly ISiteClientFactory _siteClientFactory;
/// <summary>
/// Cached site address entries loaded from the database.
/// Maps SiteIdentifier → (NodeA selection, NodeB selection).
/// Per-site ClusterClient instances and their contact addresses.
/// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings).
/// Refreshed periodically via RefreshSiteAddresses.
/// </summary>
private Dictionary<string, (ActorSelection? NodeA, ActorSelection? NodeB)> _siteAddressCache = new();
private Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new();
/// <summary>
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
@@ -41,9 +65,10 @@ public class CentralCommunicationActor : ReceiveActor
private ICancelable? _refreshSchedule;
public CentralCommunicationActor(IServiceProvider serviceProvider)
public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
{
_serviceProvider = serviceProvider;
_siteClientFactory = siteClientFactory;
// Site address cache loaded from database
Receive<SiteAddressCacheLoaded>(HandleSiteAddressCacheLoaded);
@@ -112,7 +137,7 @@ public class CentralCommunicationActor : ReceiveActor
_inProgressDeployments.Remove(deploymentId);
}
// Note: Do NOT remove from _siteAddressCache — addresses are persistent in the database
// Note: Do NOT stop the ClusterClient — it handles reconnection internally
}
else
{
@@ -122,30 +147,22 @@ public class CentralCommunicationActor : ReceiveActor
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteAddressCache.TryGetValue(envelope.SiteId, out var entry))
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No known address for site {0}, cannot route message {1}",
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
// Prefer NodeA, fall back to NodeB
var selection = entry.NodeA ?? entry.NodeB;
if (selection == null)
{
_log.Warning("Site {0} has no configured node addresses, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
return;
}
// Track debug subscriptions for cleanup on disconnect
TrackMessageForCleanup(envelope);
// Forward the inner message to the site, preserving the original sender
// so the site can reply directly to the caller (completing the Ask pattern)
selection.Tell(envelope.Message, Sender);
// Route via ClusterClient — Sender is preserved for Ask response routing
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
private void LoadSiteAddressesFromDb()
@@ -157,34 +174,72 @@ public class CentralCommunicationActor : ReceiveActor
var repo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var sites = await repo.GetAllSitesAsync();
var cache = new Dictionary<string, (string? NodeAAddress, string? NodeBAddress)>();
var contacts = new Dictionary<string, List<string>>();
foreach (var site in sites)
{
if (string.IsNullOrWhiteSpace(site.NodeAAddress) && string.IsNullOrWhiteSpace(site.NodeBAddress))
continue;
cache[site.SiteIdentifier] = (site.NodeAAddress, site.NodeBAddress);
var addrs = new List<string>();
if (!string.IsNullOrWhiteSpace(site.NodeAAddress))
{
var addr = site.NodeAAddress;
// Strip actor path suffix if present (legacy format)
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (!string.IsNullOrWhiteSpace(site.NodeBAddress))
{
var addr = site.NodeBAddress;
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (addrs.Count > 0)
contacts[site.SiteIdentifier] = addrs;
}
return new SiteAddressCacheLoaded(cache);
return new SiteAddressCacheLoaded(contacts);
}).PipeTo(self);
}
private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg)
{
var newCache = new Dictionary<string, (ActorSelection? NodeA, ActorSelection? NodeB)>();
foreach (var (siteId, (nodeAAddr, nodeBAddr)) in msg.Addresses)
var newSiteIds = msg.SiteContacts.Keys.ToHashSet();
var existingSiteIds = _siteClients.Keys.ToHashSet();
// Stop ClusterClients for removed sites
foreach (var removed in existingSiteIds.Except(newSiteIds))
{
var nodeA = !string.IsNullOrWhiteSpace(nodeAAddr)
? Context.ActorSelection(nodeAAddr)
: null;
var nodeB = !string.IsNullOrWhiteSpace(nodeBAddr)
? Context.ActorSelection(nodeBAddr)
: null;
newCache[siteId] = (nodeA, nodeB);
_log.Info("Stopping ClusterClient for removed site {0}", removed);
Context.Stop(_siteClients[removed].Client);
_siteClients.Remove(removed);
}
_siteAddressCache = newCache;
_log.Info("Site address cache refreshed with {0} site(s)", _siteAddressCache.Count);
// Add or update
foreach (var (siteId, addresses) in msg.SiteContacts)
{
var contactPaths = addresses
.Select(a => ActorPath.Parse($"{a}/system/receptionist"))
.ToImmutableHashSet();
var contactStrings = addresses.ToImmutableHashSet();
// Skip if unchanged
if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings))
continue;
// Stop old client if addresses changed
if (_siteClients.ContainsKey(siteId))
{
_log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId);
Context.Stop(_siteClients[siteId].Client);
}
var client = _siteClientFactory.Create(Context.System, siteId, contactPaths);
_siteClients[siteId] = (client, contactStrings);
_log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count);
}
_log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count);
}
private void TrackMessageForCleanup(SiteEnvelope envelope)
@@ -234,10 +289,10 @@ public class CentralCommunicationActor : ReceiveActor
public record RefreshSiteAddresses;
/// <summary>
/// Internal message carrying the loaded site address data from the database.
/// ActorSelection creation happens on the actor thread in HandleSiteAddressCacheLoaded.
/// Internal message carrying the loaded site contact data from the database.
/// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded.
/// </summary>
internal record SiteAddressCacheLoaded(Dictionary<string, (string? NodeAAddress, string? NodeBAddress)> Addresses);
internal record SiteAddressCacheLoaded(Dictionary<string, List<string>> SiteContacts);
/// <summary>
/// Notification sent to debug view subscribers when the stream is terminated

View File

@@ -1,4 +1,5 @@
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Event;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.DebugView;
@@ -11,9 +12,9 @@ using ScadaLink.Commons.Messages.RemoteQuery;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Site-side actor that receives messages from central via Akka remoting and routes
/// Site-side actor that receives messages from central via ClusterClient and routes
/// them to the appropriate local actors. Also sends heartbeats and health reports
/// to central.
/// to central via the registered ClusterClient.
///
/// WP-4: Routes all 8 message patterns to local handlers.
/// </summary>
@@ -29,10 +30,10 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
private readonly IActorRef _deploymentManagerProxy;
/// <summary>
/// Optional reference to the central communication actor for sending heartbeats/health.
/// Set via RegisterCentral message.
/// ClusterClient reference for sending messages to the central cluster.
/// Set via RegisterCentralClient message.
/// </summary>
private ActorSelection? _centralSelection;
private IActorRef? _centralClient;
/// <summary>
/// Local actor references for routing specific message patterns.
@@ -55,7 +56,11 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
_deploymentManagerProxy = deploymentManagerProxy;
// Registration
Receive<RegisterCentralPath>(HandleRegisterCentral);
Receive<RegisterCentralClient>(msg =>
{
_centralClient = msg.Client;
_log.Info("Registered central ClusterClient");
});
Receive<RegisterLocalHandler>(HandleRegisterLocalHandler);
// Pattern 1: Instance Deployment — forward to Deployment Manager
@@ -130,7 +135,8 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
// Internal: forward health report to central
Receive<SiteHealthReport>(msg =>
{
_centralSelection?.Tell(msg, Self);
_centralClient?.Tell(
new ClusterClient.Send("/user/central-communication", msg), Self);
});
}
@@ -146,12 +152,6 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
_options.TransportHeartbeatInterval);
}
private void HandleRegisterCentral(RegisterCentralPath msg)
{
_centralSelection = Context.ActorSelection(msg.CentralActorPath);
_log.Info("Registered central communication path: {0}", msg.CentralActorPath);
}
private void HandleRegisterLocalHandler(RegisterLocalHandler msg)
{
switch (msg.HandlerType)
@@ -175,7 +175,7 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
private void SendHeartbeatToCentral()
{
if (_centralSelection == null)
if (_centralClient == null)
return;
var hostname = Environment.MachineName;
@@ -185,7 +185,8 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
IsActive: true,
DateTimeOffset.UtcNow);
_centralSelection.Tell(heartbeat, Self);
_centralClient.Tell(
new ClusterClient.Send("/user/central-communication", heartbeat), Self);
}
// ── Internal messages ──
@@ -194,9 +195,9 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
}
/// <summary>
/// Command to register the central communication actor path for outbound messages.
/// Command to register a ClusterClient for communicating with the central cluster.
/// </summary>
public record RegisterCentralPath(string CentralActorPath);
public record RegisterCentralClient(IActorRef Client);
/// <summary>
/// Command to register a local actor as a handler for a specific message pattern.

View File

@@ -28,10 +28,10 @@ public class CommunicationOptions
public TimeSpan HealthReportTimeout { get; set; } = TimeSpan.FromSeconds(10);
/// <summary>
/// Remote actor path for the central communication actor. Used by site nodes to
/// register with central on startup (e.g. "akka.tcp://scadalink@central:8081/user/central-communication").
/// Contact point addresses for the central cluster (e.g. "akka.tcp://scadalink@central-a:8081").
/// Used by site nodes to create a ClusterClient for reaching central.
/// </summary>
public string? CentralActorPath { get; set; }
public List<string> CentralContactPoints { get; set; } = new();
/// <summary>Akka.Remote transport heartbeat interval.</summary>
public TimeSpan TransportHeartbeatInterval { get; set; } = TimeSpan.FromSeconds(5);