Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs
T

293 lines
13 KiB
C#

using Akka.Actor;
using Akka.Cluster;
using Akka.Event;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
/// <summary>
/// Runs on every site node (not a singleton). Handles both config and S&amp;F replication
/// between site cluster peers.
///
/// Outbound: receives local replication requests and forwards to peer via ActorSelection.
/// Inbound: receives replicated operations from peer and applies to local SQLite.
/// Uses fire-and-forget (Tell) — no ack wait per design.
/// </summary>
public class SiteReplicationActor : ReceiveActor
{
private readonly SiteStorageService _storage;
private readonly StoreAndForwardStorage _sfStorage;
private readonly ReplicationService _replicationService;
private readonly IDeploymentConfigFetcher? _configFetcher;
private readonly string _siteRole;
private readonly ILogger<SiteReplicationActor> _logger;
private readonly Cluster _cluster;
private Address? _peerAddress;
/// <summary>
/// Initializes a new <see cref="SiteReplicationActor"/> and registers Akka message handlers.
/// </summary>
/// <param name="storage">Service for accessing local site storage.</param>
/// <param name="sfStorage">Store-and-forward SQLite storage for replication of buffered messages.</param>
/// <param name="replicationService">Service providing replication transport logic.</param>
/// <param name="siteRole">Akka cluster role used to identify peer nodes to replicate to.</param>
/// <param name="logger">Logger instance.</param>
/// <param name="configFetcher">
/// Fetches a deployed instance's config JSON from central over HTTP. Used by the
/// notify-and-fetch standby apply path (<see cref="HandleApplyConfigDeploy"/>): the peer
/// replicates only the deployment id, and the standby fetches the config itself so a large
/// config never crosses the intra-site Akka hop. Null on nodes/tests without a fetcher.
/// </param>
public SiteReplicationActor(
SiteStorageService storage,
StoreAndForwardStorage sfStorage,
ReplicationService replicationService,
string siteRole,
ILogger<SiteReplicationActor> logger,
IDeploymentConfigFetcher? configFetcher = null)
{
_storage = storage;
_sfStorage = sfStorage;
_replicationService = replicationService;
_configFetcher = configFetcher;
_siteRole = siteRole;
_logger = logger;
_cluster = Cluster.Get(Context.System);
// Cluster member events
Receive<ClusterEvent.MemberUp>(HandleMemberUp);
Receive<ClusterEvent.MemberRemoved>(HandleMemberRemoved);
Receive<ClusterEvent.CurrentClusterState>(HandleCurrentClusterState);
// Outbound — forward to peer
Receive<ReplicateConfigDeploy>(msg => SendToPeer(new ApplyConfigDeploy(
msg.InstanceName, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled,
msg.CentralFetchBaseUrl, msg.FetchToken)));
Receive<ReplicateConfigRemove>(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName)));
Receive<ReplicateConfigSetEnabled>(msg => SendToPeer(new ApplyConfigSetEnabled(
msg.InstanceName, msg.IsEnabled)));
Receive<ReplicateArtifacts>(msg => SendToPeer(new ApplyArtifacts(msg.Command)));
Receive<ReplicateStoreAndForward>(msg => SendToPeer(new ApplyStoreAndForward(msg.Operation)));
// Inbound — apply from peer
Receive<ApplyConfigDeploy>(HandleApplyConfigDeploy);
Receive<ApplyConfigRemove>(HandleApplyConfigRemove);
Receive<ApplyConfigSetEnabled>(HandleApplyConfigSetEnabled);
Receive<ApplyArtifacts>(HandleApplyArtifacts);
Receive<ApplyStoreAndForward>(HandleApplyStoreAndForward);
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
_cluster.Subscribe(Self, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot,
typeof(ClusterEvent.MemberUp),
typeof(ClusterEvent.MemberRemoved));
_logger.LogInformation("SiteReplicationActor started, subscribing to cluster events for role {Role}", _siteRole);
}
/// <inheritdoc />
protected override void PostStop()
{
_cluster.Unsubscribe(Self);
base.PostStop();
}
private void HandleCurrentClusterState(ClusterEvent.CurrentClusterState state)
{
foreach (var member in state.Members)
{
if (member.Status == MemberStatus.Up)
TryTrackPeer(member);
}
}
private void HandleMemberUp(ClusterEvent.MemberUp evt)
{
TryTrackPeer(evt.Member);
}
private void HandleMemberRemoved(ClusterEvent.MemberRemoved evt)
{
if (evt.Member.Address.Equals(_peerAddress))
{
_logger.LogInformation("Peer node removed: {Address}", _peerAddress);
_peerAddress = null;
}
}
private void TryTrackPeer(Member member)
{
// Must have our site role, and must not be self
if (member.HasRole(_siteRole) && !member.Address.Equals(_cluster.SelfAddress))
{
_peerAddress = member.Address;
_logger.LogInformation("Peer node tracked: {Address}", _peerAddress);
}
}
/// <summary>
/// Forwards a replication message to the tracked peer node's <c>site-replication</c> actor
/// (fire-and-forget, dropped when no peer is tracked). <see langword="protected virtual"/>
/// so tests can intercept the peer send without standing up a real two-node cluster.
/// </summary>
protected virtual void SendToPeer(object message)
{
if (_peerAddress == null)
{
_logger.LogDebug("No peer available, dropping replication message {Type}", message.GetType().Name);
return;
}
var path = new RootActorPath(_peerAddress) / "user" / "site-replication";
Context.ActorSelection(path).Tell(message);
}
// ── Inbound handlers ──
private void HandleApplyConfigDeploy(ApplyConfigDeploy msg)
{
if (string.IsNullOrEmpty(msg.CentralFetchBaseUrl))
{
// The direct DeployInstanceCommand cross-cluster wire path was retired (Task 14).
// This guard is a defensive fallback: skip quietly rather than calling FetchAsync("")
// and logging a spurious error. T18 reconciliation backstops any missed writes.
_logger.LogDebug(
"No fetch coords for {Instance} (deployment {DeploymentId}) — skipping replicated fetch; T18 reconciliation is the backstop",
msg.InstanceName, msg.DeploymentId);
return;
}
if (_configFetcher is null)
{
_logger.LogWarning(
"No config fetcher available; cannot apply replicated config for {Instance} (deployment {DeploymentId}) — reconciliation will backstop",
msg.InstanceName, msg.DeploymentId);
return;
}
_logger.LogInformation(
"Replicating config for {Instance} (deployment {DeploymentId}) — fetching from central",
msg.InstanceName, msg.DeploymentId);
// Notify-and-fetch: the peer sent only the id, so the standby fetches the config
// itself (off-thread; best-effort fire-and-forget, matching the no-ack replication
// model). The guarded write only overwrites a strictly-older local row. A single
// fetch attempt — T18 reconciliation is the durable backstop for a lost fetch.
_configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None)
.ContinueWith(async t =>
{
try
{
if (t.IsCompletedSuccessfully)
{
await _storage.StoreDeployedConfigIfNewerAsync(
msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled);
return;
}
var ex = t.Exception?.GetBaseException();
if (ex is DeploymentConfigFetchException { IsSuperseded: true })
_logger.LogInformation(
"Skip replicated config for {Instance}: superseded/expired (a newer deploy will replicate)",
msg.InstanceName);
else if (t.IsCanceled)
_logger.LogWarning(
"Replicated config fetch cancelled for {Instance} (deployment {DeploymentId})",
msg.InstanceName, msg.DeploymentId);
else
_logger.LogError(ex,
"Replicated config fetch failed for {Instance} (deployment {DeploymentId})",
msg.InstanceName, msg.DeploymentId);
}
catch (Exception writeEx)
{
// Guarded-write failure is best-effort; observe + log so nothing faults silently.
_logger.LogError(writeEx,
"Failed to write replicated config for {Instance} (deployment {DeploymentId})",
msg.InstanceName, msg.DeploymentId);
}
})
.Unwrap();
}
private void HandleApplyConfigRemove(ApplyConfigRemove msg)
{
_logger.LogInformation("Applying replicated config remove for {Instance}", msg.InstanceName);
_storage.RemoveDeployedConfigAsync(msg.InstanceName)
.ContinueWith(t =>
{
if (t.IsFaulted)
_logger.LogError(t.Exception, "Failed to apply replicated remove for {Instance}", msg.InstanceName);
});
}
private void HandleApplyConfigSetEnabled(ApplyConfigSetEnabled msg)
{
_logger.LogInformation("Applying replicated set-enabled={Enabled} for {Instance}", msg.IsEnabled, msg.InstanceName);
_storage.SetInstanceEnabledAsync(msg.InstanceName, msg.IsEnabled)
.ContinueWith(t =>
{
if (t.IsFaulted)
_logger.LogError(t.Exception, "Failed to apply replicated set-enabled for {Instance}", msg.InstanceName);
});
}
private void HandleApplyArtifacts(ApplyArtifacts msg)
{
var command = msg.Command;
_logger.LogInformation("Applying replicated artifacts, deploymentId={DeploymentId}", command.DeploymentId);
Task.Run(async () =>
{
try
{
if (command.SharedScripts != null)
foreach (var s in command.SharedScripts)
await _storage.StoreSharedScriptAsync(s.Name, s.Code, s.ParameterDefinitions, s.ReturnDefinition);
if (command.ExternalSystems != null)
foreach (var es in command.ExternalSystems)
await _storage.StoreExternalSystemAsync(es.Name, es.EndpointUrl, es.AuthType, es.AuthConfiguration, es.MethodDefinitionsJson);
if (command.DatabaseConnections != null)
foreach (var db in command.DatabaseConnections)
await _storage.StoreDatabaseConnectionAsync(db.Name, db.ConnectionString, db.MaxRetries, db.RetryDelay);
// DeploymentManager-025 / SiteRuntime-031: notification lists and SMTP
// configuration are central-only and are never persisted on a site.
// Mirror the primary apply path: purge any pre-fix rows (including the
// plaintext SMTP password) instead of writing the command's
// (now-always-null) NotificationLists/SmtpConfigurations.
await _storage.PurgeCentralOnlyNotificationConfigAsync();
if (command.DataConnections != null)
foreach (var dc in command.DataConnections)
await _storage.StoreDataConnectionDefinitionAsync(dc.Name, dc.Protocol, dc.PrimaryConfigurationJson, dc.BackupConfigurationJson, dc.FailoverRetryCount);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to apply replicated artifacts");
}
});
}
private void HandleApplyStoreAndForward(ApplyStoreAndForward msg)
{
_logger.LogDebug("Applying replicated S&F operation {OpType} for message {Id}",
msg.Operation.OperationType, msg.Operation.MessageId);
_replicationService.ApplyReplicatedOperationAsync(msg.Operation, _sfStorage)
.ContinueWith(t =>
{
if (t.IsFaulted)
_logger.LogError(t.Exception, "Failed to apply replicated S&F operation {Id}", msg.Operation.MessageId);
});
}
}