92d1df88f4
Wrap the script compile-cost guardrail block in its own inner try/catch so a transient SQL failure on ToListAsync cannot fall through to the outer catch and produce a Rejected reply for an otherwise-valid deploy. advisory is declared in the outer scope so the Accepted StartDeploymentResult Message is unaffected on the happy path; the inner catch logs a Warning and leaves advisory null.
297 lines
14 KiB
C#
297 lines
14 KiB
C#
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
|
|
|
/// <summary>
|
|
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
|
|
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
|
|
/// Routed to via <see cref="Commons.Interfaces.IAdminOperationsClient"/> from anywhere in the cluster.
|
|
/// </summary>
|
|
public sealed class AdminOperationsActor : ReceiveActor
|
|
{
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
|
private readonly IActorRef _coordinator;
|
|
private readonly IReadOnlyDictionary<string, IDriverProbe> _probesByType;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
/// <summary>Creates actor props for the admin operations actor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
/// <returns>Props configured to create an AdminOperationsActor.</returns>
|
|
public static Props Props(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes) =>
|
|
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes));
|
|
|
|
/// <summary>Initializes a new instance of the AdminOperationsActor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
public AdminOperationsActor(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_coordinator = coordinator;
|
|
_probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase);
|
|
|
|
ReceiveAsync<StartDeployment>(HandleStartDeploymentAsync);
|
|
ReceiveAsync<TestDriverConnect>(HandleTestDriverConnectAsync);
|
|
ReceiveAsync<RestartDriver>(HandleRestartDriverAsync);
|
|
ReceiveAsync<ReconnectDriver>(HandleReconnectDriverAsync);
|
|
}
|
|
|
|
private async Task HandleStartDeploymentAsync(StartDeployment msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
|
|
// Refuse if any deployment is already in flight — keeps the coordinator's state
|
|
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
|
|
var inflight = await db.Deployments
|
|
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
|
|
.Select(d => d.DeploymentId)
|
|
.FirstOrDefaultAsync();
|
|
if (inflight != Guid.Empty)
|
|
{
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.AnotherDeploymentInFlight,
|
|
DeploymentId: new DeploymentId(inflight),
|
|
RevisionHash: null,
|
|
Message: $"Deployment {inflight:N} is still in flight.",
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
// Full pre-seal gate: run the complete DraftValidator (one pass, all rules) and reject
|
|
// on ANY validation error. The config is now canonical — the company overlay loader emits
|
|
// canonical EquipmentIds and the seed is clean — so every rule (UNS segments, EquipmentId
|
|
// derivation, cross-cluster/namespace binding, driver-namespace compat, signal collisions,
|
|
// …) gates the deploy. A green build in this repo does not prove the config is valid; this
|
|
// is the last guard before a bad address space (or a non-derived EquipmentId) ships.
|
|
// NOTE: a BadDuplicateExternalIdentifier rejection from the reservation pre-flight can
|
|
// (rarely) be a false positive if an external-id reservation release has not yet been
|
|
// committed/visible when the snapshot is read — operators seeing a spurious one should
|
|
// check ExternalIdReservation state before re-submitting.
|
|
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
|
|
var errors = DraftValidator.Validate(draft);
|
|
if (errors.Count > 0)
|
|
{
|
|
var summary = string.Join("; ", errors.Select(e => $"[{e.Code}] {e.Message}"));
|
|
_log.Warning("StartDeployment rejected ({Count} validation error(s)): {Summary}",
|
|
errors.Count, summary);
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Rejected,
|
|
DeploymentId: null,
|
|
RevisionHash: null,
|
|
Message: summary,
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
// Warn-only compile-cost guardrail (NEVER blocks). Each genuinely-compiled (non-passthrough)
|
|
// script costs ~1.66 MiB RSS per node to materialize via Roslyn (measured post-A0; design doc
|
|
// 2026-06-07). Passthrough "mirror" scripts compile to ~nothing, so they are excluded; distinct
|
|
// sources are counted (the compile cache keys on source, so duplicates collapse to one unit).
|
|
// This only surfaces the estimate to the operator — the DraftValidator gate above is the hard
|
|
// reject; this advisory rides in the Accepted Message so the UI can show it.
|
|
// advisory is declared outside the inner try so the Accepted reply below can still read it even
|
|
// when the guardrail query throws (e.g. transient SQL). A failed estimate must NEVER block a
|
|
// valid deploy — the outer catch is reserved for genuine seal/save failures.
|
|
string? advisory = null;
|
|
try
|
|
{
|
|
const double PerScriptMiB = 1.66; // measured post-A0 per-script RSS (design doc 2026-06-07)
|
|
var scriptSources = await db.Scripts.AsNoTracking().Select(s => s.SourceCode).ToListAsync();
|
|
var compiled = scriptSources
|
|
.Where(src => !PassthroughScript.TryMatch(src, out _))
|
|
.Distinct(StringComparer.Ordinal)
|
|
.Count();
|
|
if (compiled > 0)
|
|
{
|
|
var estMiB = compiled * PerScriptMiB;
|
|
_log.Warning(
|
|
"StartDeployment: {Compiled} script(s) will compile (~{EstMiB:F0} MiB RSS per node); ensure node mem_limit covers it",
|
|
compiled, estMiB);
|
|
advisory = $"{compiled} script(s) will compile (~{estMiB:F0} MiB/node)";
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Guardrail is advisory-only — a failed estimate must never block a valid deploy.
|
|
_log.Warning(ex, "StartDeployment: script compile-cost estimate failed; advisory skipped");
|
|
advisory = null;
|
|
}
|
|
|
|
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
|
|
var deploymentId = DeploymentId.NewId();
|
|
var revHash = RevisionHash.Parse(artifact.RevisionHash);
|
|
|
|
db.Deployments.Add(new Deployment
|
|
{
|
|
DeploymentId = deploymentId.Value,
|
|
RevisionHash = artifact.RevisionHash,
|
|
Status = DeploymentStatus.Dispatching,
|
|
CreatedBy = msg.CreatedBy,
|
|
ArtifactBlob = artifact.Blob,
|
|
});
|
|
|
|
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "Deployment",
|
|
EntityId = deploymentId.Value,
|
|
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
|
|
EditedBy = msg.CreatedBy,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
|
|
await db.SaveChangesAsync();
|
|
|
|
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
|
|
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Accepted,
|
|
deploymentId,
|
|
revHash,
|
|
Message: advisory,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Rejected,
|
|
DeploymentId: null,
|
|
RevisionHash: null,
|
|
Message: ex.Message,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleTestDriverConnectAsync(TestDriverConnect msg)
|
|
{
|
|
var replyTo = Sender;
|
|
if (!_probesByType.TryGetValue(msg.DriverType, out var probe))
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"No probe registered for driver type '{msg.DriverType}'.",
|
|
null,
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60);
|
|
var timeout = TimeSpan.FromSeconds(clampedSec);
|
|
using var cts = new CancellationTokenSource(timeout);
|
|
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
try
|
|
{
|
|
var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token);
|
|
sw.Stop();
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
result.Ok,
|
|
result.Message,
|
|
result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"Probe timed out after {clampedSec}s.",
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "Probe for {DriverType} threw", msg.DriverType);
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
ex.Message,
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleRestartDriverAsync(RestartDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
|
|
// Only the host that owns the instance will act; others ignore it (id not found in _children).
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleReconnectDriverAsync(ReconnectDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
}
|