feat: wire SQLite replication between site nodes and fix ConfigurationDatabase tests
Add SiteReplicationActor (runs on every site node) to replicate deployed configs and store-and-forward buffer operations to the standby peer via cluster member discovery and fire-and-forget Tell. Wire ReplicationService handler and pass replication actor to DeploymentManagerActor singleton. Fix 5 pre-existing ConfigurationDatabase test failures: RowVersion NOT NULL on SQLite, stale migration name assertion, and seed data count mismatch.
This commit is contained in:
214
src/ScadaLink.SiteRuntime/Actors/SiteReplicationActor.cs
Normal file
214
src/ScadaLink.SiteRuntime/Actors/SiteReplicationActor.cs
Normal file
@@ -0,0 +1,214 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster;
|
||||
using Akka.Event;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ScadaLink.SiteRuntime.Messages;
|
||||
using ScadaLink.SiteRuntime.Persistence;
|
||||
using ScadaLink.StoreAndForward;
|
||||
|
||||
namespace ScadaLink.SiteRuntime.Actors;
|
||||
|
||||
/// <summary>
|
||||
/// Runs on every site node (not a singleton). Handles both config and S&F replication
|
||||
/// between site cluster peers.
|
||||
///
|
||||
/// Outbound: receives local replication requests and forwards to peer via ActorSelection.
|
||||
/// Inbound: receives replicated operations from peer and applies to local SQLite.
|
||||
/// Uses fire-and-forget (Tell) — no ack wait per design.
|
||||
/// </summary>
|
||||
public class SiteReplicationActor : ReceiveActor
|
||||
{
|
||||
private readonly SiteStorageService _storage;
|
||||
private readonly StoreAndForwardStorage _sfStorage;
|
||||
private readonly ReplicationService _replicationService;
|
||||
private readonly string _siteRole;
|
||||
private readonly ILogger<SiteReplicationActor> _logger;
|
||||
private readonly Cluster _cluster;
|
||||
private Address? _peerAddress;
|
||||
|
||||
public SiteReplicationActor(
|
||||
SiteStorageService storage,
|
||||
StoreAndForwardStorage sfStorage,
|
||||
ReplicationService replicationService,
|
||||
string siteRole,
|
||||
ILogger<SiteReplicationActor> logger)
|
||||
{
|
||||
_storage = storage;
|
||||
_sfStorage = sfStorage;
|
||||
_replicationService = replicationService;
|
||||
_siteRole = siteRole;
|
||||
_logger = logger;
|
||||
_cluster = Cluster.Get(Context.System);
|
||||
|
||||
// Cluster member events
|
||||
Receive<ClusterEvent.MemberUp>(HandleMemberUp);
|
||||
Receive<ClusterEvent.MemberRemoved>(HandleMemberRemoved);
|
||||
Receive<ClusterEvent.CurrentClusterState>(HandleCurrentClusterState);
|
||||
|
||||
// Outbound — forward to peer
|
||||
Receive<ReplicateConfigDeploy>(msg => SendToPeer(new ApplyConfigDeploy(
|
||||
msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled)));
|
||||
Receive<ReplicateConfigRemove>(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName)));
|
||||
Receive<ReplicateConfigSetEnabled>(msg => SendToPeer(new ApplyConfigSetEnabled(
|
||||
msg.InstanceName, msg.IsEnabled)));
|
||||
Receive<ReplicateArtifacts>(msg => SendToPeer(new ApplyArtifacts(msg.Command)));
|
||||
Receive<ReplicateStoreAndForward>(msg => SendToPeer(new ApplyStoreAndForward(msg.Operation)));
|
||||
|
||||
// Inbound — apply from peer
|
||||
Receive<ApplyConfigDeploy>(HandleApplyConfigDeploy);
|
||||
Receive<ApplyConfigRemove>(HandleApplyConfigRemove);
|
||||
Receive<ApplyConfigSetEnabled>(HandleApplyConfigSetEnabled);
|
||||
Receive<ApplyArtifacts>(HandleApplyArtifacts);
|
||||
Receive<ApplyStoreAndForward>(HandleApplyStoreAndForward);
|
||||
}
|
||||
|
||||
protected override void PreStart()
|
||||
{
|
||||
base.PreStart();
|
||||
_cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot,
|
||||
typeof(ClusterEvent.MemberUp),
|
||||
typeof(ClusterEvent.MemberRemoved));
|
||||
_logger.LogInformation("SiteReplicationActor started, subscribing to cluster events for role {Role}", _siteRole);
|
||||
}
|
||||
|
||||
protected override void PostStop()
|
||||
{
|
||||
_cluster.Unsubscribe(Self);
|
||||
base.PostStop();
|
||||
}
|
||||
|
||||
private void HandleCurrentClusterState(ClusterEvent.CurrentClusterState state)
|
||||
{
|
||||
foreach (var member in state.Members)
|
||||
{
|
||||
if (member.Status == MemberStatus.Up)
|
||||
TryTrackPeer(member);
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleMemberUp(ClusterEvent.MemberUp evt)
|
||||
{
|
||||
TryTrackPeer(evt.Member);
|
||||
}
|
||||
|
||||
private void HandleMemberRemoved(ClusterEvent.MemberRemoved evt)
|
||||
{
|
||||
if (evt.Member.Address.Equals(_peerAddress))
|
||||
{
|
||||
_logger.LogInformation("Peer node removed: {Address}", _peerAddress);
|
||||
_peerAddress = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void TryTrackPeer(Member member)
|
||||
{
|
||||
// Must have our site role, and must not be self
|
||||
if (member.HasRole(_siteRole) && !member.Address.Equals(_cluster.SelfAddress))
|
||||
{
|
||||
_peerAddress = member.Address;
|
||||
_logger.LogInformation("Peer node tracked: {Address}", _peerAddress);
|
||||
}
|
||||
}
|
||||
|
||||
private void SendToPeer(object message)
|
||||
{
|
||||
if (_peerAddress == null)
|
||||
{
|
||||
_logger.LogDebug("No peer available, dropping replication message {Type}", message.GetType().Name);
|
||||
return;
|
||||
}
|
||||
|
||||
var path = new RootActorPath(_peerAddress) / "user" / "site-replication";
|
||||
Context.ActorSelection(path).Tell(message);
|
||||
}
|
||||
|
||||
// ── Inbound handlers ──
|
||||
|
||||
private void HandleApplyConfigDeploy(ApplyConfigDeploy msg)
|
||||
{
|
||||
_logger.LogInformation("Applying replicated config deploy for {Instance}", msg.InstanceName);
|
||||
_storage.StoreDeployedConfigAsync(
|
||||
msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "Failed to apply replicated deploy for {Instance}", msg.InstanceName);
|
||||
});
|
||||
}
|
||||
|
||||
private void HandleApplyConfigRemove(ApplyConfigRemove msg)
|
||||
{
|
||||
_logger.LogInformation("Applying replicated config remove for {Instance}", msg.InstanceName);
|
||||
_storage.RemoveDeployedConfigAsync(msg.InstanceName)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "Failed to apply replicated remove for {Instance}", msg.InstanceName);
|
||||
});
|
||||
}
|
||||
|
||||
private void HandleApplyConfigSetEnabled(ApplyConfigSetEnabled msg)
|
||||
{
|
||||
_logger.LogInformation("Applying replicated set-enabled={Enabled} for {Instance}", msg.IsEnabled, msg.InstanceName);
|
||||
_storage.SetInstanceEnabledAsync(msg.InstanceName, msg.IsEnabled)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "Failed to apply replicated set-enabled for {Instance}", msg.InstanceName);
|
||||
});
|
||||
}
|
||||
|
||||
private void HandleApplyArtifacts(ApplyArtifacts msg)
|
||||
{
|
||||
var command = msg.Command;
|
||||
_logger.LogInformation("Applying replicated artifacts, deploymentId={DeploymentId}", command.DeploymentId);
|
||||
|
||||
Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (command.SharedScripts != null)
|
||||
foreach (var s in command.SharedScripts)
|
||||
await _storage.StoreSharedScriptAsync(s.Name, s.Code, s.ParameterDefinitions, s.ReturnDefinition);
|
||||
|
||||
if (command.ExternalSystems != null)
|
||||
foreach (var es in command.ExternalSystems)
|
||||
await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl, es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson);
|
||||
|
||||
if (command.DatabaseConnections != null)
|
||||
foreach (var db in command.DatabaseConnections)
|
||||
await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString, db.MaxRetries, db.RetryDelay);
|
||||
|
||||
if (command.NotificationLists != null)
|
||||
foreach (var nl in command.NotificationLists)
|
||||
await _storage.StoreNotificationListAsync(nl.Name, nl.RecipientEmails);
|
||||
|
||||
if (command.DataConnections != null)
|
||||
foreach (var dc in command.DataConnections)
|
||||
await _storage.StoreDataConnectionDefinitionAsync(dc.Name, dc.Protocol, dc.ConfigurationJson);
|
||||
|
||||
if (command.SmtpConfigurations != null)
|
||||
foreach (var smtp in command.SmtpConfigurations)
|
||||
await _storage.StoreSmtpConfigurationAsync(smtp.Name, smtp.Server, smtp.Port, smtp.AuthMode,
|
||||
smtp.FromAddress, smtp.Username, smtp.Password, smtp.OAuthConfig);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to apply replicated artifacts");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void HandleApplyStoreAndForward(ApplyStoreAndForward msg)
|
||||
{
|
||||
_logger.LogDebug("Applying replicated S&F operation {OpType} for message {Id}",
|
||||
msg.Operation.OperationType, msg.Operation.MessageId);
|
||||
|
||||
_replicationService.ApplyReplicatedOperationAsync(msg.Operation, _sfStorage)
|
||||
.ContinueWith(t =>
|
||||
{
|
||||
if (t.IsFaulted)
|
||||
_logger.LogError(t.Exception, "Failed to apply replicated S&F operation {Id}", msg.Operation.MessageId);
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user