using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
///
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
/// Routed to via from anywhere in the cluster.
///
public sealed class AdminOperationsActor : ReceiveActor
{
private readonly IDbContextFactory _dbFactory;
private readonly IActorRef _coordinator;
private readonly IReadOnlyDictionary _probesByType;
private readonly ILoggingAdapter _log = Context.GetLogger();
/// Creates actor props for the admin operations actor.
/// Factory for creating config database contexts.
/// Reference to the deployment coordinator actor.
/// Driver probes registered in DI; keyed by DriverType (case-insensitive).
/// Props configured to create an AdminOperationsActor.
public static Props Props(
IDbContextFactory dbFactory,
IActorRef coordinator,
IEnumerable probes) =>
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes));
/// Initializes a new instance of the AdminOperationsActor.
/// Factory for creating config database contexts.
/// Reference to the deployment coordinator actor.
/// Driver probes registered in DI; keyed by DriverType (case-insensitive).
public AdminOperationsActor(
IDbContextFactory dbFactory,
IActorRef coordinator,
IEnumerable probes)
{
_dbFactory = dbFactory;
_coordinator = coordinator;
_probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase);
ReceiveAsync(HandleStartDeploymentAsync);
ReceiveAsync(HandleTestDriverConnectAsync);
ReceiveAsync(HandleRestartDriverAsync);
ReceiveAsync(HandleReconnectDriverAsync);
Receive(HandleAcknowledgeAlarm);
Receive(HandleShelveAlarm);
}
///
/// AdminUI Acknowledge path. Maps the control-plane command to a
/// (Operation = "Acknowledge") and publishes it onto the
/// cluster alarm-commands topic. The broadcast lands on every driver node's
/// ScriptedAlarmHostActor; only the node owning the alarm acts (ownership filter), so the
/// admin singleton needs no knowledge of placement. Synchronous — the publish is fire-and-forget
/// via the mediator, so there is no awaitable work and no DB write.
///
private void HandleAcknowledgeAlarm(AcknowledgeAlarmCommand msg)
{
var replyTo = Sender;
try
{
var alarmCmd = new AlarmCommand(
AlarmId: msg.AlarmId,
Operation: "Acknowledge",
User: msg.User,
Comment: msg.Comment,
UnshelveAtUtc: null);
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd));
_log.Info("AdminOps: Acknowledge published for alarm {AlarmId} by {User}", msg.AlarmId, msg.User);
replyTo.Tell(new AcknowledgeAlarmResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: Acknowledge failed for alarm {AlarmId}", msg.AlarmId);
replyTo.Tell(new AcknowledgeAlarmResult(false, ex.Message, msg.CorrelationId));
}
}
///
/// AdminUI Shelve / Unshelve path. Maps to the matching
/// operation (OneShotShelve / TimedShelve /
/// Unshelve), threading for the timed kind,
/// and publishes onto the cluster alarm-commands topic. Ownership filtering happens on the
/// owning node exactly as for Acknowledge.
///
private void HandleShelveAlarm(ShelveAlarmCommand msg)
{
var replyTo = Sender;
try
{
var (operation, unshelveAt) = msg.Kind switch
{
ShelveKind.OneShot => ("OneShotShelve", (DateTime?)null),
ShelveKind.Timed => ("TimedShelve", msg.UnshelveAtUtc),
ShelveKind.Unshelve => ("Unshelve", (DateTime?)null),
_ => throw new ArgumentOutOfRangeException(nameof(msg), msg.Kind, "Unknown shelve kind."),
};
// TimedShelve requires an unshelve instant — the engine rejects it otherwise. Guard here so
// the AdminUI gets an immediate, attributable failure instead of a silently-dropped command.
if (msg.Kind == ShelveKind.Timed && unshelveAt is null)
{
replyTo.Tell(new ShelveAlarmResult(false, "TimedShelve requires UnshelveAtUtc.", msg.CorrelationId));
return;
}
var alarmCmd = new AlarmCommand(
AlarmId: msg.AlarmId,
Operation: operation,
User: msg.User,
Comment: msg.Comment,
UnshelveAtUtc: unshelveAt);
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd));
_log.Info("AdminOps: {Operation} published for alarm {AlarmId} by {User}", operation, msg.AlarmId, msg.User);
replyTo.Tell(new ShelveAlarmResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: Shelve ({Kind}) failed for alarm {AlarmId}", msg.Kind, msg.AlarmId);
replyTo.Tell(new ShelveAlarmResult(false, ex.Message, msg.CorrelationId));
}
}
private async Task HandleStartDeploymentAsync(StartDeployment msg)
{
var replyTo = Sender;
try
{
await using var db = await _dbFactory.CreateDbContextAsync();
// Refuse if any deployment is already in flight — keeps the coordinator's state
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
var inflight = await db.Deployments
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
.Select(d => d.DeploymentId)
.FirstOrDefaultAsync();
if (inflight != Guid.Empty)
{
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.AnotherDeploymentInFlight,
DeploymentId: new DeploymentId(inflight),
RevisionHash: null,
Message: $"Deployment {inflight:N} is still in flight.",
msg.CorrelationId));
return;
}
// Full pre-seal gate: run the complete DraftValidator (one pass, all rules) and reject
// on ANY validation error. The config is now canonical — the company overlay loader emits
// canonical EquipmentIds and the seed is clean — so every rule (UNS segments, EquipmentId
// derivation, cross-cluster/namespace binding, driver-namespace compat, signal collisions,
// …) gates the deploy. A green build in this repo does not prove the config is valid; this
// is the last guard before a bad address space (or a non-derived EquipmentId) ships.
// NOTE: a BadDuplicateExternalIdentifier rejection from the reservation pre-flight can
// (rarely) be a false positive if an external-id reservation release has not yet been
// committed/visible when the snapshot is read — operators seeing a spurious one should
// check ExternalIdReservation state before re-submitting.
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
var errors = DraftValidator.Validate(draft);
if (errors.Count > 0)
{
var summary = string.Join("; ", errors.Select(e => $"[{e.Code}] {e.Message}"));
_log.Warning("StartDeployment rejected ({Count} validation error(s)): {Summary}",
errors.Count, summary);
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Rejected,
DeploymentId: null,
RevisionHash: null,
Message: summary,
msg.CorrelationId));
return;
}
// Warn-only compile-cost guardrail (NEVER blocks). Each genuinely-compiled (non-passthrough)
// script costs ~1.66 MiB RSS per node to materialize via Roslyn (measured post-A0; design doc
// 2026-06-07). Passthrough "mirror" scripts compile to ~nothing, so they are excluded; distinct
// sources are counted (the compile cache keys on source, so duplicates collapse to one unit).
// This only surfaces the estimate to the operator — the DraftValidator gate above is the hard
// reject; this advisory rides in the Accepted Message so the UI can show it.
// advisory is declared outside the inner try so the Accepted reply below can still read it even
// when the guardrail query throws (e.g. transient SQL). A failed estimate must NEVER block a
// valid deploy — the outer catch is reserved for genuine seal/save failures.
string? advisory = null;
try
{
const double PerScriptMiB = 1.66; // measured post-A0 per-script RSS (design doc 2026-06-07)
var scriptSources = await db.Scripts.AsNoTracking().Select(s => s.SourceCode).ToListAsync();
var compiled = scriptSources
.Where(src => !PassthroughScript.TryMatch(src, out _))
.Distinct(StringComparer.Ordinal)
.Count();
if (compiled > 0)
{
var estMiB = compiled * PerScriptMiB;
_log.Warning(
"StartDeployment: {Compiled} script(s) will compile (~{EstMiB:F0} MiB RSS per node); ensure node mem_limit covers it",
compiled, estMiB);
advisory = $"{compiled} script(s) will compile (~{estMiB:F0} MiB/node)";
}
}
catch (Exception ex)
{
// Guardrail is advisory-only — a failed estimate must never block a valid deploy.
_log.Warning(ex, "StartDeployment: script compile-cost estimate failed; advisory skipped");
advisory = null;
}
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
var deploymentId = DeploymentId.NewId();
var revHash = RevisionHash.Parse(artifact.RevisionHash);
db.Deployments.Add(new Deployment
{
DeploymentId = deploymentId.Value,
RevisionHash = artifact.RevisionHash,
Status = DeploymentStatus.Dispatching,
CreatedBy = msg.CreatedBy,
ArtifactBlob = artifact.Blob,
});
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "Deployment",
EntityId = deploymentId.Value,
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
EditedBy = msg.CreatedBy,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Accepted,
deploymentId,
revHash,
Message: advisory,
msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Rejected,
DeploymentId: null,
RevisionHash: null,
Message: ex.Message,
msg.CorrelationId));
}
}
private async Task HandleTestDriverConnectAsync(TestDriverConnect msg)
{
var replyTo = Sender;
if (!_probesByType.TryGetValue(msg.DriverType, out var probe))
{
replyTo.Tell(new TestDriverConnectResult(
false,
$"No probe registered for driver type '{msg.DriverType}'.",
null,
msg.CorrelationId));
return;
}
var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60);
var timeout = TimeSpan.FromSeconds(clampedSec);
using var cts = new CancellationTokenSource(timeout);
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token);
sw.Stop();
replyTo.Tell(new TestDriverConnectResult(
result.Ok,
result.Message,
result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null,
msg.CorrelationId));
}
catch (OperationCanceledException)
{
replyTo.Tell(new TestDriverConnectResult(
false,
$"Probe timed out after {clampedSec}s.",
null,
msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "Probe for {DriverType} threw", msg.DriverType);
replyTo.Tell(new TestDriverConnectResult(
false,
ex.Message,
null,
msg.CorrelationId));
}
}
private async Task HandleRestartDriverAsync(RestartDriver msg)
{
var replyTo = Sender;
try
{
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
// Only the host that owns the instance will act; others ignore it (id not found in _children).
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
await using var db = await _dbFactory.CreateDbContextAsync();
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "DriverInstance",
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
EditedBy = msg.ActorByUserName,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}",
msg.DriverInstanceId, msg.ActorByUserName);
replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId));
}
}
private async Task HandleReconnectDriverAsync(ReconnectDriver msg)
{
var replyTo = Sender;
try
{
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
await using var db = await _dbFactory.CreateDbContextAsync();
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "DriverInstance",
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
EditedBy = msg.ActorByUserName,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}",
msg.DriverInstanceId, msg.ActorByUserName);
replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId));
}
}
}