using Akka.Actor; using Akka.Cluster; using Akka.Event; using Microsoft.Extensions.Logging; using ScadaLink.SiteRuntime.Messages; using ScadaLink.SiteRuntime.Persistence; using ScadaLink.StoreAndForward; namespace ScadaLink.SiteRuntime.Actors; /// /// Runs on every site node (not a singleton). Handles both config and S&F replication /// between site cluster peers. /// /// Outbound: receives local replication requests and forwards to peer via ActorSelection. /// Inbound: receives replicated operations from peer and applies to local SQLite. /// Uses fire-and-forget (Tell) — no ack wait per design. /// public class SiteReplicationActor : ReceiveActor { private readonly SiteStorageService _storage; private readonly StoreAndForwardStorage _sfStorage; private readonly ReplicationService _replicationService; private readonly string _siteRole; private readonly ILogger _logger; private readonly Cluster _cluster; private Address? _peerAddress; public SiteReplicationActor( SiteStorageService storage, StoreAndForwardStorage sfStorage, ReplicationService replicationService, string siteRole, ILogger logger) { _storage = storage; _sfStorage = sfStorage; _replicationService = replicationService; _siteRole = siteRole; _logger = logger; _cluster = Cluster.Get(Context.System); // Cluster member events Receive(HandleMemberUp); Receive(HandleMemberRemoved); Receive(HandleCurrentClusterState); // Outbound — forward to peer Receive(msg => SendToPeer(new ApplyConfigDeploy( msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled))); Receive(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName))); Receive(msg => SendToPeer(new ApplyConfigSetEnabled( msg.InstanceName, msg.IsEnabled))); Receive(msg => SendToPeer(new ApplyArtifacts(msg.Command))); Receive(msg => SendToPeer(new ApplyStoreAndForward(msg.Operation))); // Inbound — apply from peer Receive(HandleApplyConfigDeploy); Receive(HandleApplyConfigRemove); Receive(HandleApplyConfigSetEnabled); Receive(HandleApplyArtifacts); Receive(HandleApplyStoreAndForward); } protected override void PreStart() { base.PreStart(); _cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, typeof(ClusterEvent.MemberUp), typeof(ClusterEvent.MemberRemoved)); _logger.LogInformation("SiteReplicationActor started, subscribing to cluster events for role {Role}", _siteRole); } protected override void PostStop() { _cluster.Unsubscribe(Self); base.PostStop(); } private void HandleCurrentClusterState(ClusterEvent.CurrentClusterState state) { foreach (var member in state.Members) { if (member.Status == MemberStatus.Up) TryTrackPeer(member); } } private void HandleMemberUp(ClusterEvent.MemberUp evt) { TryTrackPeer(evt.Member); } private void HandleMemberRemoved(ClusterEvent.MemberRemoved evt) { if (evt.Member.Address.Equals(_peerAddress)) { _logger.LogInformation("Peer node removed: {Address}", _peerAddress); _peerAddress = null; } } private void TryTrackPeer(Member member) { // Must have our site role, and must not be self if (member.HasRole(_siteRole) && !member.Address.Equals(_cluster.SelfAddress)) { _peerAddress = member.Address; _logger.LogInformation("Peer node tracked: {Address}", _peerAddress); } } private void SendToPeer(object message) { if (_peerAddress == null) { _logger.LogDebug("No peer available, dropping replication message {Type}", message.GetType().Name); return; } var path = new RootActorPath(_peerAddress) / "user" / "site-replication"; Context.ActorSelection(path).Tell(message); } // ── Inbound handlers ── private void HandleApplyConfigDeploy(ApplyConfigDeploy msg) { _logger.LogInformation("Applying replicated config deploy for {Instance}", msg.InstanceName); _storage.StoreDeployedConfigAsync( msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated deploy for {Instance}", msg.InstanceName); }); } private void HandleApplyConfigRemove(ApplyConfigRemove msg) { _logger.LogInformation("Applying replicated config remove for {Instance}", msg.InstanceName); _storage.RemoveDeployedConfigAsync(msg.InstanceName) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated remove for {Instance}", msg.InstanceName); }); } private void HandleApplyConfigSetEnabled(ApplyConfigSetEnabled msg) { _logger.LogInformation("Applying replicated set-enabled={Enabled} for {Instance}", msg.IsEnabled, msg.InstanceName); _storage.SetInstanceEnabledAsync(msg.InstanceName, msg.IsEnabled) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated set-enabled for {Instance}", msg.InstanceName); }); } private void HandleApplyArtifacts(ApplyArtifacts msg) { var command = msg.Command; _logger.LogInformation("Applying replicated artifacts, deploymentId={DeploymentId}", command.DeploymentId); Task.Run(async () => { try { if (command.SharedScripts != null) foreach (var s in command.SharedScripts) await _storage.StoreSharedScriptAsync(s.Name, s.Code, s.ParameterDefinitions, s.ReturnDefinition); if (command.ExternalSystems != null) foreach (var es in command.ExternalSystems) await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl, es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson); if (command.DatabaseConnections != null) foreach (var db in command.DatabaseConnections) await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString, db.MaxRetries, db.RetryDelay); if (command.NotificationLists != null) foreach (var nl in command.NotificationLists) await _storage.StoreNotificationListAsync(nl.Name, nl.RecipientEmails); if (command.DataConnections != null) foreach (var dc in command.DataConnections) await _storage.StoreDataConnectionDefinitionAsync(dc.Name, dc.Protocol, dc.PrimaryConfigurationJson); if (command.SmtpConfigurations != null) foreach (var smtp in command.SmtpConfigurations) await _storage.StoreSmtpConfigurationAsync(smtp.Name, smtp.Server, smtp.Port, smtp.AuthMode, smtp.FromAddress, smtp.Username, smtp.Password, smtp.OAuthConfig); } catch (Exception ex) { _logger.LogError(ex, "Failed to apply replicated artifacts"); } }); } private void HandleApplyStoreAndForward(ApplyStoreAndForward msg) { _logger.LogDebug("Applying replicated S&F operation {OpType} for message {Id}", msg.Operation.OperationType, msg.Operation.MessageId); _replicationService.ApplyReplicatedOperationAsync(msg.Operation, _sfStorage) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated S&F operation {Id}", msg.Operation.MessageId); }); } }