feat: replace site registration with database-driven site addressing

Central now resolves site Akka remoting addresses from the Sites DB table
(NodeAAddress/NodeBAddress) instead of relying on runtime RegisterSite
messages. Eliminates the race condition where sites starting before central
had their registration dead-lettered. Addresses are cached in
CentralCommunicationActor with 60s periodic refresh and on-demand refresh
when sites are added/edited/deleted via UI or CLI.
This commit is contained in:
Joseph Doherty
2026-03-17 23:13:10 -04:00
parent eb8d5ca2c0
commit 9e97c1acd2
21 changed files with 1641 additions and 92 deletions

View File

@@ -1,5 +1,7 @@
using Akka.Actor;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Health;
@@ -7,7 +9,7 @@ namespace ScadaLink.Communication.Actors;
/// <summary>
/// Central-side actor that routes messages from central to site clusters via Akka remoting.
/// Maintains a registry of known site actor paths (learned from heartbeats/connection events).
/// Resolves site addresses from the database on a periodic refresh cycle.
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
@@ -15,12 +17,14 @@ namespace ScadaLink.Communication.Actors;
public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IServiceProvider _serviceProvider;
/// <summary>
/// Maps SiteId → remote SiteCommunicationActor selection.
/// Updated when heartbeats arrive or connection state changes.
/// Cached site address entries loaded from the database.
/// Maps SiteIdentifier → (NodeA selection, NodeB selection).
/// Refreshed periodically via RefreshSiteAddresses.
/// </summary>
private readonly Dictionary<string, ActorSelection> _siteSelections = new();
private Dictionary<string, (ActorSelection? NodeA, ActorSelection? NodeB)> _siteAddressCache = new();
/// <summary>
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
@@ -34,31 +38,30 @@ public class CentralCommunicationActor : ReceiveActor
/// </summary>
private readonly Dictionary<string, string> _inProgressDeployments = new();
public CentralCommunicationActor()
private ICancelable? _refreshSchedule;
public CentralCommunicationActor(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
// Site address cache loaded from database
Receive<SiteAddressCacheLoaded>(HandleSiteAddressCacheLoaded);
// Periodic refresh trigger
Receive<RefreshSiteAddresses>(_ => LoadSiteAddressesFromDb());
// Site registration via heartbeats
Receive<HeartbeatMessage>(HandleHeartbeat);
// Connection state changes
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
// Site registration command (manual or from discovery)
Receive<RegisterSite>(HandleRegisterSite);
// Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope);
}
private void HandleHeartbeat(HeartbeatMessage heartbeat)
{
// Heartbeats arrive from sites — forward to any interested central actors
// The sender's path tells us the site's communication actor address
if (!_siteSelections.ContainsKey(heartbeat.SiteId))
{
var senderPath = Sender.Path.ToString();
_log.Info("Learned site {0} from heartbeat at path {1}", heartbeat.SiteId, senderPath);
}
// Forward heartbeat to parent/subscribers (central health monitoring)
Context.Parent.Tell(heartbeat);
}
@@ -94,7 +97,7 @@ public class CentralCommunicationActor : ReceiveActor
_inProgressDeployments.Remove(deploymentId);
}
_siteSelections.Remove(msg.SiteId);
// Note: Do NOT remove from _siteAddressCache — addresses are persistent in the database
}
else
{
@@ -102,30 +105,71 @@ public class CentralCommunicationActor : ReceiveActor
}
}
private void HandleRegisterSite(RegisterSite msg)
{
var selection = Context.ActorSelection(msg.RemoteActorPath);
_siteSelections[msg.SiteId] = selection;
_log.Info("Registered site {0} at path {1}", msg.SiteId, msg.RemoteActorPath);
}
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteSelections.TryGetValue(envelope.SiteId, out var siteSelection))
if (!_siteAddressCache.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No known path for site {0}, cannot route message {1}",
_log.Warning("No known address for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
// Prefer NodeA, fall back to NodeB
var selection = entry.NodeA ?? entry.NodeB;
if (selection == null)
{
_log.Warning("Site {0} has no configured node addresses, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
return;
}
// Track debug subscriptions for cleanup on disconnect
TrackMessageForCleanup(envelope);
// Forward the inner message to the site, preserving the original sender
// so the site can reply directly to the caller (completing the Ask pattern)
siteSelection.Tell(envelope.Message, Sender);
selection.Tell(envelope.Message, Sender);
}
private void LoadSiteAddressesFromDb()
{
var self = Self;
Task.Run(async () =>
{
using var scope = _serviceProvider.CreateScope();
var repo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var sites = await repo.GetAllSitesAsync();
var cache = new Dictionary<string, (string? NodeAAddress, string? NodeBAddress)>();
foreach (var site in sites)
{
if (string.IsNullOrWhiteSpace(site.NodeAAddress) && string.IsNullOrWhiteSpace(site.NodeBAddress))
continue;
cache[site.SiteIdentifier] = (site.NodeAAddress, site.NodeBAddress);
}
return new SiteAddressCacheLoaded(cache);
}).PipeTo(self);
}
private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg)
{
var newCache = new Dictionary<string, (ActorSelection? NodeA, ActorSelection? NodeB)>();
foreach (var (siteId, (nodeAAddr, nodeBAddr)) in msg.Addresses)
{
var nodeA = !string.IsNullOrWhiteSpace(nodeAAddr)
? Context.ActorSelection(nodeAAddr)
: null;
var nodeB = !string.IsNullOrWhiteSpace(nodeBAddr)
? Context.ActorSelection(nodeBAddr)
: null;
newCache[siteId] = (nodeA, nodeB);
}
_siteAddressCache = newCache;
_log.Info("Site address cache refreshed with {0} site(s)", _siteAddressCache.Count);
}
private void TrackMessageForCleanup(SiteEnvelope envelope)
@@ -149,11 +193,20 @@ public class CentralCommunicationActor : ReceiveActor
protected override void PreStart()
{
_log.Info("CentralCommunicationActor started");
// Schedule periodic refresh of site addresses from the database
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
TimeSpan.Zero,
TimeSpan.FromSeconds(60),
Self,
new RefreshSiteAddresses(),
ActorRefs.NoSender);
}
protected override void PostStop()
{
_log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5).");
_refreshSchedule?.Cancel();
// On central failover, all in-progress deployments are failed
_inProgressDeployments.Clear();
_debugSubscriptions.Clear();
@@ -161,9 +214,15 @@ public class CentralCommunicationActor : ReceiveActor
}
/// <summary>
/// Command to register a site's remote communication actor path.
/// Command to trigger a refresh of site addresses from the database.
/// </summary>
public record RegisterSite(string SiteId, string RemoteActorPath);
public record RefreshSiteAddresses;
/// <summary>
/// Internal message carrying the loaded site address data from the database.
/// ActorSelection creation happens on the actor thread in HandleSiteAddressCacheLoaded.
/// </summary>
internal record SiteAddressCacheLoaded(Dictionary<string, (string? NodeAAddress, string? NodeBAddress)> Addresses);
/// <summary>
/// Notification sent to debug view subscribers when the stream is terminated

View File

@@ -9,6 +9,7 @@ using ScadaLink.Commons.Messages.InboundApi;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Messages.Lifecycle;
using ScadaLink.Commons.Messages.RemoteQuery;
using ScadaLink.Communication.Actors;
namespace ScadaLink.Communication;
@@ -39,6 +40,14 @@ public class CommunicationService
_centralCommunicationActor = centralCommunicationActor;
}
/// <summary>
/// Triggers an immediate refresh of the site address cache from the database.
/// </summary>
public void RefreshSiteAddresses()
{
GetActor().Tell(new RefreshSiteAddresses());
}
private IActorRef GetActor()
{
return _centralCommunicationActor