using Akka.Actor; using Akka.Cluster; using Akka.Event; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.StoreAndForward; namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; /// /// Runs on every site node (not a singleton). Handles both config and S&F replication /// between site cluster peers. /// /// Outbound: receives local replication requests and forwards to peer via ActorSelection. /// Inbound: receives replicated operations from peer and applies to local SQLite. /// Uses fire-and-forget (Tell) — no ack wait per design. /// public class SiteReplicationActor : ReceiveActor { private readonly SiteStorageService _storage; private readonly StoreAndForwardStorage _sfStorage; private readonly ReplicationService _replicationService; private readonly IDeploymentConfigFetcher? _configFetcher; private readonly string _siteRole; private readonly ILogger _logger; private readonly Cluster _cluster; private Address? _peerAddress; /// /// Initializes a new and registers Akka message handlers. /// /// Service for accessing local site storage. /// Store-and-forward SQLite storage for replication of buffered messages. /// Service providing replication transport logic. /// Akka cluster role used to identify peer nodes to replicate to. /// Logger instance. /// /// Fetches a deployed instance's config JSON from central over HTTP. Used by the /// notify-and-fetch standby apply path (): the peer /// replicates only the deployment id, and the standby fetches the config itself so a large /// config never crosses the intra-site Akka hop. Null on nodes/tests without a fetcher. /// public SiteReplicationActor( SiteStorageService storage, StoreAndForwardStorage sfStorage, ReplicationService replicationService, string siteRole, ILogger logger, IDeploymentConfigFetcher? configFetcher = null) { _storage = storage; _sfStorage = sfStorage; _replicationService = replicationService; _configFetcher = configFetcher; _siteRole = siteRole; _logger = logger; _cluster = Cluster.Get(Context.System); // Cluster member events Receive(HandleMemberUp); Receive(HandleMemberRemoved); Receive(HandleCurrentClusterState); // Outbound — forward to peer Receive(msg => SendToPeer(new ApplyConfigDeploy( msg.InstanceName, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled, msg.CentralFetchBaseUrl, msg.FetchToken))); Receive(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName))); Receive(msg => SendToPeer(new ApplyConfigSetEnabled( msg.InstanceName, msg.IsEnabled))); Receive(msg => SendToPeer(new ApplyArtifacts(msg.Command))); Receive(msg => SendToPeer(new ApplyStoreAndForward(msg.Operation))); // Inbound — apply from peer Receive(HandleApplyConfigDeploy); Receive(HandleApplyConfigRemove); Receive(HandleApplyConfigSetEnabled); Receive(HandleApplyArtifacts); Receive(HandleApplyStoreAndForward); } /// protected override void PreStart() { base.PreStart(); _cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, typeof(ClusterEvent.MemberUp), typeof(ClusterEvent.MemberRemoved)); _logger.LogInformation("SiteReplicationActor started, subscribing to cluster events for role {Role}", _siteRole); } /// protected override void PostStop() { _cluster.Unsubscribe(Self); base.PostStop(); } private void HandleCurrentClusterState(ClusterEvent.CurrentClusterState state) { foreach (var member in state.Members) { if (member.Status == MemberStatus.Up) TryTrackPeer(member); } } private void HandleMemberUp(ClusterEvent.MemberUp evt) { TryTrackPeer(evt.Member); } private void HandleMemberRemoved(ClusterEvent.MemberRemoved evt) { if (evt.Member.Address.Equals(_peerAddress)) { _logger.LogInformation("Peer node removed: {Address}", _peerAddress); _peerAddress = null; } } private void TryTrackPeer(Member member) { // Must have our site role, and must not be self if (member.HasRole(_siteRole) && !member.Address.Equals(_cluster.SelfAddress)) { _peerAddress = member.Address; _logger.LogInformation("Peer node tracked: {Address}", _peerAddress); } } /// /// Forwards a replication message to the tracked peer node's site-replication actor /// (fire-and-forget, dropped when no peer is tracked). /// so tests can intercept the peer send without standing up a real two-node cluster. /// protected virtual void SendToPeer(object message) { if (_peerAddress == null) { _logger.LogDebug("No peer available, dropping replication message {Type}", message.GetType().Name); return; } var path = new RootActorPath(_peerAddress) / "user" / "site-replication"; Context.ActorSelection(path).Tell(message); } // ── Inbound handlers ── private void HandleApplyConfigDeploy(ApplyConfigDeploy msg) { if (string.IsNullOrEmpty(msg.CentralFetchBaseUrl)) { // The still-present direct DeployInstanceCommand wire path (retired in Task 14) // replicates with empty coords; there is nothing to fetch. Skip quietly rather // than calling FetchAsync("") and logging an error — T18 reconciliation backstops. _logger.LogDebug( "No fetch coords for {Instance} (deployment {DeploymentId}) — skipping replicated fetch; T18 reconciliation is the backstop", msg.InstanceName, msg.DeploymentId); return; } if (_configFetcher is null) { _logger.LogWarning( "No config fetcher available; cannot apply replicated config for {Instance} (deployment {DeploymentId}) — reconciliation will backstop", msg.InstanceName, msg.DeploymentId); return; } _logger.LogInformation( "Replicating config for {Instance} (deployment {DeploymentId}) — fetching from central", msg.InstanceName, msg.DeploymentId); // Notify-and-fetch: the peer sent only the id, so the standby fetches the config // itself (off-thread; best-effort fire-and-forget, matching the no-ack replication // model). The guarded write only overwrites a strictly-older local row. A single // fetch attempt — T18 reconciliation is the durable backstop for a lost fetch. _configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None) .ContinueWith(async t => { try { if (t.IsCompletedSuccessfully) { await _storage.StoreDeployedConfigIfNewerAsync( msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled); return; } var ex = t.Exception?.GetBaseException(); if (ex is DeploymentConfigFetchException { IsSuperseded: true }) _logger.LogInformation( "Skip replicated config for {Instance}: superseded/expired (a newer deploy will replicate)", msg.InstanceName); else if (t.IsCanceled) _logger.LogWarning( "Replicated config fetch cancelled for {Instance} (deployment {DeploymentId})", msg.InstanceName, msg.DeploymentId); else _logger.LogError(ex, "Replicated config fetch failed for {Instance} (deployment {DeploymentId})", msg.InstanceName, msg.DeploymentId); } catch (Exception writeEx) { // Guarded-write failure is best-effort; observe + log so nothing faults silently. _logger.LogError(writeEx, "Failed to write replicated config for {Instance} (deployment {DeploymentId})", msg.InstanceName, msg.DeploymentId); } }) .Unwrap(); } private void HandleApplyConfigRemove(ApplyConfigRemove msg) { _logger.LogInformation("Applying replicated config remove for {Instance}", msg.InstanceName); _storage.RemoveDeployedConfigAsync(msg.InstanceName) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated remove for {Instance}", msg.InstanceName); }); } private void HandleApplyConfigSetEnabled(ApplyConfigSetEnabled msg) { _logger.LogInformation("Applying replicated set-enabled={Enabled} for {Instance}", msg.IsEnabled, msg.InstanceName); _storage.SetInstanceEnabledAsync(msg.InstanceName, msg.IsEnabled) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated set-enabled for {Instance}", msg.InstanceName); }); } private void HandleApplyArtifacts(ApplyArtifacts msg) { var command = msg.Command; _logger.LogInformation("Applying replicated artifacts, deploymentId={DeploymentId}", command.DeploymentId); Task.Run(async () => { try { if (command.SharedScripts != null) foreach (var s in command.SharedScripts) await _storage.StoreSharedScriptAsync(s.Name, s.Code, s.ParameterDefinitions, s.ReturnDefinition); if (command.ExternalSystems != null) foreach (var es in command.ExternalSystems) await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl, es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson); if (command.DatabaseConnections != null) foreach (var db in command.DatabaseConnections) await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString, db.MaxRetries, db.RetryDelay); // DeploymentManager-025 / SiteRuntime-031: notification lists and SMTP // configuration are central-only and are never persisted on a site. // Mirror the primary apply path: purge any pre-fix rows (including the // plaintext SMTP password) instead of writing the command's // (now-always-null) NotificationLists/SmtpConfigurations. await _storage.PurgeCentralOnlyNotificationConfigAsync(); if (command.DataConnections != null) foreach (var dc in command.DataConnections) await _storage.StoreDataConnectionDefinitionAsync(dc.Name, dc.Protocol, dc.PrimaryConfigurationJson, dc.BackupConfigurationJson, dc.FailoverRetryCount); } catch (Exception ex) { _logger.LogError(ex, "Failed to apply replicated artifacts"); } }); } private void HandleApplyStoreAndForward(ApplyStoreAndForward msg) { _logger.LogDebug("Applying replicated S&F operation {OpType} for message {Id}", msg.Operation.OperationType, msg.Operation.MessageId); _replicationService.ApplyReplicatedOperationAsync(msg.Operation, _sfStorage) .ContinueWith(t => { if (t.IsFaulted) _logger.LogError(t.Exception, "Failed to apply replicated S&F operation {Id}", msg.Operation.MessageId); }); } }