using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Configuration.Validation; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Scripting; namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; /// /// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start /// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI. /// Routed to via from anywhere in the cluster. /// public sealed class AdminOperationsActor : ReceiveActor { private readonly IDbContextFactory _dbFactory; private readonly IActorRef _coordinator; private readonly IReadOnlyDictionary _probesByType; private readonly ILoggingAdapter _log = Context.GetLogger(); /// Creates actor props for the admin operations actor. /// Factory for creating config database contexts. /// Reference to the deployment coordinator actor. /// Driver probes registered in DI; keyed by DriverType (case-insensitive). /// Props configured to create an AdminOperationsActor. public static Props Props( IDbContextFactory dbFactory, IActorRef coordinator, IEnumerable probes) => Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes)); /// Initializes a new instance of the AdminOperationsActor. /// Factory for creating config database contexts. /// Reference to the deployment coordinator actor. /// Driver probes registered in DI; keyed by DriverType (case-insensitive). public AdminOperationsActor( IDbContextFactory dbFactory, IActorRef coordinator, IEnumerable probes) { _dbFactory = dbFactory; _coordinator = coordinator; _probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase); ReceiveAsync(HandleStartDeploymentAsync); ReceiveAsync(HandleTestDriverConnectAsync); ReceiveAsync(HandleRestartDriverAsync); ReceiveAsync(HandleReconnectDriverAsync); Receive(HandleAcknowledgeAlarm); Receive(HandleShelveAlarm); } /// /// AdminUI Acknowledge path. Maps the control-plane command to a /// (Operation = "Acknowledge") and publishes it onto the /// cluster alarm-commands topic. The broadcast lands on every driver node's /// ScriptedAlarmHostActor; only the node owning the alarm acts (ownership filter), so the /// admin singleton needs no knowledge of placement. Synchronous — the publish is fire-and-forget /// via the mediator, so there is no awaitable work and no DB write. /// private void HandleAcknowledgeAlarm(AcknowledgeAlarmCommand msg) { var replyTo = Sender; try { var alarmCmd = new AlarmCommand( AlarmId: msg.AlarmId, Operation: "Acknowledge", User: msg.User, Comment: msg.Comment, UnshelveAtUtc: null); DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd)); _log.Info("AdminOps: Acknowledge published for alarm {AlarmId} by {User}", msg.AlarmId, msg.User); replyTo.Tell(new AcknowledgeAlarmResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: Acknowledge failed for alarm {AlarmId}", msg.AlarmId); replyTo.Tell(new AcknowledgeAlarmResult(false, ex.Message, msg.CorrelationId)); } } /// /// AdminUI Shelve / Unshelve path. Maps to the matching /// operation (OneShotShelve / TimedShelve / /// Unshelve), threading for the timed kind, /// and publishes onto the cluster alarm-commands topic. Ownership filtering happens on the /// owning node exactly as for Acknowledge. /// private void HandleShelveAlarm(ShelveAlarmCommand msg) { var replyTo = Sender; try { var (operation, unshelveAt) = msg.Kind switch { ShelveKind.OneShot => ("OneShotShelve", (DateTime?)null), ShelveKind.Timed => ("TimedShelve", msg.UnshelveAtUtc), ShelveKind.Unshelve => ("Unshelve", (DateTime?)null), _ => throw new ArgumentOutOfRangeException(nameof(msg), msg.Kind, "Unknown shelve kind."), }; // TimedShelve requires an unshelve instant — the engine rejects it otherwise. Guard here so // the AdminUI gets an immediate, attributable failure instead of a silently-dropped command. if (msg.Kind == ShelveKind.Timed && unshelveAt is null) { replyTo.Tell(new ShelveAlarmResult(false, "TimedShelve requires UnshelveAtUtc.", msg.CorrelationId)); return; } var alarmCmd = new AlarmCommand( AlarmId: msg.AlarmId, Operation: operation, User: msg.User, Comment: msg.Comment, UnshelveAtUtc: unshelveAt); DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd)); _log.Info("AdminOps: {Operation} published for alarm {AlarmId} by {User}", operation, msg.AlarmId, msg.User); replyTo.Tell(new ShelveAlarmResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: Shelve ({Kind}) failed for alarm {AlarmId}", msg.Kind, msg.AlarmId); replyTo.Tell(new ShelveAlarmResult(false, ex.Message, msg.CorrelationId)); } } private async Task HandleStartDeploymentAsync(StartDeployment msg) { var replyTo = Sender; try { await using var db = await _dbFactory.CreateDbContextAsync(); // Refuse if any deployment is already in flight — keeps the coordinator's state // unambiguous. The UI is expected to wait for the in-flight one to seal or fail. var inflight = await db.Deployments .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) .Select(d => d.DeploymentId) .FirstOrDefaultAsync(); if (inflight != Guid.Empty) { replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.AnotherDeploymentInFlight, DeploymentId: new DeploymentId(inflight), RevisionHash: null, Message: $"Deployment {inflight:N} is still in flight.", msg.CorrelationId)); return; } // Full pre-seal gate: run the complete DraftValidator (one pass, all rules) and reject // on ANY validation error. The config is now canonical — the company overlay loader emits // canonical EquipmentIds and the seed is clean — so every rule (UNS segments, EquipmentId // derivation, cross-cluster/namespace binding, driver-namespace compat, signal collisions, // …) gates the deploy. A green build in this repo does not prove the config is valid; this // is the last guard before a bad address space (or a non-derived EquipmentId) ships. // NOTE: a BadDuplicateExternalIdentifier rejection from the reservation pre-flight can // (rarely) be a false positive if an external-id reservation release has not yet been // committed/visible when the snapshot is read — operators seeing a spurious one should // check ExternalIdReservation state before re-submitting. var draft = await DraftSnapshotFactory.FromConfigDbAsync(db); var errors = DraftValidator.Validate(draft); if (errors.Count > 0) { var summary = string.Join("; ", errors.Select(e => $"[{e.Code}] {e.Message}")); _log.Warning("StartDeployment rejected ({Count} validation error(s)): {Summary}", errors.Count, summary); replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.Rejected, DeploymentId: null, RevisionHash: null, Message: summary, msg.CorrelationId)); return; } // Warn-only compile-cost guardrail (NEVER blocks). Each genuinely-compiled (non-passthrough) // script costs ~1.66 MiB RSS per node to materialize via Roslyn (measured post-A0; design doc // 2026-06-07). Passthrough "mirror" scripts compile to ~nothing, so they are excluded; distinct // sources are counted (the compile cache keys on source, so duplicates collapse to one unit). // This only surfaces the estimate to the operator — the DraftValidator gate above is the hard // reject; this advisory rides in the Accepted Message so the UI can show it. // advisory is declared outside the inner try so the Accepted reply below can still read it even // when the guardrail query throws (e.g. transient SQL). A failed estimate must NEVER block a // valid deploy — the outer catch is reserved for genuine seal/save failures. string? advisory = null; try { const double PerScriptMiB = 1.66; // measured post-A0 per-script RSS (design doc 2026-06-07) var scriptSources = await db.Scripts.AsNoTracking().Select(s => s.SourceCode).ToListAsync(); var compiled = scriptSources .Where(src => !PassthroughScript.TryMatch(src, out _)) .Distinct(StringComparer.Ordinal) .Count(); if (compiled > 0) { var estMiB = compiled * PerScriptMiB; _log.Warning( "StartDeployment: {Compiled} script(s) will compile (~{EstMiB:F0} MiB RSS per node); ensure node mem_limit covers it", compiled, estMiB); advisory = $"{compiled} script(s) will compile (~{estMiB:F0} MiB/node)"; } } catch (Exception ex) { // Guardrail is advisory-only — a failed estimate must never block a valid deploy. _log.Warning(ex, "StartDeployment: script compile-cost estimate failed; advisory skipped"); advisory = null; } var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db); var deploymentId = DeploymentId.NewId(); var revHash = RevisionHash.Parse(artifact.RevisionHash); db.Deployments.Add(new Deployment { DeploymentId = deploymentId.Value, RevisionHash = artifact.RevisionHash, Status = DeploymentStatus.Dispatching, CreatedBy = msg.CreatedBy, ArtifactBlob = artifact.Blob, }); // Marker ConfigEdit row so the audit timeline shows the deployment snapshot. db.ConfigEdits.Add(new ConfigEdit { EntityType = "Deployment", EntityId = deploymentId.Value, FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}", EditedBy = msg.CreatedBy, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId)); replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.Accepted, deploymentId, revHash, Message: advisory, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy); replyTo.Tell(new StartDeploymentResult( StartDeploymentOutcome.Rejected, DeploymentId: null, RevisionHash: null, Message: ex.Message, msg.CorrelationId)); } } private async Task HandleTestDriverConnectAsync(TestDriverConnect msg) { var replyTo = Sender; if (!_probesByType.TryGetValue(msg.DriverType, out var probe)) { replyTo.Tell(new TestDriverConnectResult( false, $"No probe registered for driver type '{msg.DriverType}'.", null, msg.CorrelationId)); return; } var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60); var timeout = TimeSpan.FromSeconds(clampedSec); using var cts = new CancellationTokenSource(timeout); var sw = System.Diagnostics.Stopwatch.StartNew(); try { var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token); sw.Stop(); replyTo.Tell(new TestDriverConnectResult( result.Ok, result.Message, result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null, msg.CorrelationId)); } catch (OperationCanceledException) { replyTo.Tell(new TestDriverConnectResult( false, $"Probe timed out after {clampedSec}s.", null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "Probe for {DriverType} threw", msg.DriverType); replyTo.Tell(new TestDriverConnectResult( false, ex.Message, null, msg.CorrelationId)); } } private async Task HandleRestartDriverAsync(RestartDriver msg) { var replyTo = Sender; try { // Broadcast to every DriverHostActor on every node via the driver-control DPS topic. // Only the host that owns the instance will act; others ignore it (id not found in _children). DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg)); await using var db = await _dbFactory.CreateDbContextAsync(); db.ConfigEdits.Add(new ConfigEdit { EntityType = "DriverInstance", EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty, FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}", EditedBy = msg.ActorByUserName, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}", msg.DriverInstanceId, msg.ActorByUserName); replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId); replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId)); } } private async Task HandleReconnectDriverAsync(ReconnectDriver msg) { var replyTo = Sender; try { // Broadcast to every DriverHostActor; only the one owning the instance reacts. DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg)); await using var db = await _dbFactory.CreateDbContextAsync(); db.ConfigEdits.Add(new ConfigEdit { EntityType = "DriverInstance", EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty, FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}", EditedBy = msg.ActorByUserName, SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", }); await db.SaveChangesAsync(); _log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}", msg.DriverInstanceId, msg.ActorByUserName); replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId)); } catch (Exception ex) { _log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId); replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId)); } } }