using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; /// /// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start /// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI. /// Routed to via from anywhere in the cluster. /// public sealed class AdminOperationsActor : ReceiveActor { private readonly IDbContextFactory _dbFactory; private readonly IActorRef _coordinator; private readonly IReadOnlyDictionary _probesByType; private readonly ILoggingAdapter _log = Context.GetLogger(); /// Creates actor props for the admin operations actor. /// Factory for creating config database contexts. /// Reference to the deployment coordinator actor. /// Driver probes registered in DI; keyed by DriverType (case-insensitive). /// Props configured to create an AdminOperationsActor. public static Props Props( IDbContextFactory dbFactory, IActorRef coordinator, IEnumerable probes) => Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes)); /// Initializes a new instance of the AdminOperationsActor. /// Factory for creating config database contexts. /// Reference to the deployment coordinator actor. /// Driver probes registered in DI; keyed by DriverType (case-insensitive). public AdminOperationsActor( IDbContextFactory dbFactory, IActorRef coordinator, IEnumerable probes) { _dbFactory = dbFactory; _coordinator = coordinator; _probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase); ReceiveAsync(HandleStartDeploymentAsync); ReceiveAsync(HandleTestDriverConnectAsync); ReceiveAsync(HandleRestartDriverAsync); ReceiveAsync(HandleReconnectDriverAsync); } private async Task HandleStartDeploymentAsync(StartDeployment msg) { var replyTo = Sender; try { await using var db = await _dbFactory.CreateDbContextAsync(); // Refuse if any deployment is already in flight — keeps the coordinator's state // unambiguous. The UI is expected to wait for the in-flight one to seal or fail. var inflight = await db.Deployments .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) .Select(d => d.DeploymentId) .FirstOrDefaultAsync(); if (inflight != Guid.Empty) { replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.AnotherDeploymentInFlight, DeploymentId: new DeploymentId(inflight), RevisionHash: null, Message: $"Deployment {inflight:N} is still in flight.", msg.CorrelationId)); return; } var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db); var deploymentId = DeploymentId.NewId(); var revHash = RevisionHash.Parse(artifact.RevisionHash); db.Deployments.Add(new Deployment { DeploymentId = deploymentId.Value, RevisionHash = artifact.RevisionHash, Status = DeploymentStatus.Dispatching, CreatedBy = msg.CreatedBy, ArtifactBlob = artifact.Blob, }); // Marker ConfigEdit row so the audit timeline shows the deployment snapshot. db.ConfigEdits.Add(new ConfigEdit { EntityType = "Deployment", EntityId = deploymentId.Value, FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}", EditedBy = msg.CreatedBy, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId)); replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.Accepted, deploymentId, revHash, Message: null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy); replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.Rejected, DeploymentId: null, RevisionHash: null, Message: ex.Message, msg.CorrelationId)); } } private async Task HandleTestDriverConnectAsync(TestDriverConnect msg) { var replyTo = Sender; if (!_probesByType.TryGetValue(msg.DriverType, out var probe)) { replyTo.Tell(new TestDriverConnectResult( false, $"No probe registered for driver type '{msg.DriverType}'.", null, msg.CorrelationId)); return; } var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60); var timeout = TimeSpan.FromSeconds(clampedSec); using var cts = new CancellationTokenSource(timeout); var sw = System.Diagnostics.Stopwatch.StartNew(); try { var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token); sw.Stop(); replyTo.Tell(new TestDriverConnectResult( result.Ok, result.Message, result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null, msg.CorrelationId)); } catch (OperationCanceledException) { replyTo.Tell(new TestDriverConnectResult( false, $"Probe timed out after {clampedSec}s.", null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "Probe for {DriverType} threw", msg.DriverType); replyTo.Tell(new TestDriverConnectResult( false, ex.Message, null, msg.CorrelationId)); } } private async Task HandleRestartDriverAsync(RestartDriver msg) { var replyTo = Sender; try { // Broadcast to every DriverHostActor on every node via the driver-control DPS topic. // Only the host that owns the instance will act; others ignore it (id not found in _children). DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg)); await using var db = await _dbFactory.CreateDbContextAsync(); db.ConfigEdits.Add(new ConfigEdit { EntityType = "DriverInstance", EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty, FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}", EditedBy = msg.ActorByUserName, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}", msg.DriverInstanceId, msg.ActorByUserName); replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId); replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId)); } } private async Task HandleReconnectDriverAsync(ReconnectDriver msg) { var replyTo = Sender; try { // Broadcast to every DriverHostActor; only the one owning the instance reacts. DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg)); await using var db = await _dbFactory.CreateDbContextAsync(); db.ConfigEdits.Add(new ConfigEdit { EntityType = "DriverInstance", EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty, FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}", EditedBy = msg.ActorByUserName, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}", msg.DriverInstanceId, msg.ActorByUserName); replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId); replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId)); } } }