370a2b7b48
T21: add an AdminUI path for acknowledging/shelving alarms that routes through the admin-pinned AdminOperationsActor cluster singleton, which republishes onto the same 'alarm-commands' DPS topic the OPC UA method path (T18) and the engine subscriber (T19) use. The broadcast + the ScriptedAlarmHostActor ownership filter handle cross-node routing, so the singleton needs no knowledge of which node owns the alarm. - Commons: AcknowledgeAlarmCommand/ShelveAlarmCommand (+ result records) and a shared AlarmCommandsTopic const; ScriptedAlarmHostActor now re-exports that const (mirrors the DriverControlTopic pattern). - AdminOperationsActor: two handlers map the control-plane messages to AlarmCommand (Acknowledge / OneShotShelve / TimedShelve / Unshelve, threading User/Comment/UnshelveAtUtc) and publish via the DPS mediator. - IAdminOperationsClient + AdminOperationsClient: typed Acknowledge/Shelve ask wrappers mirroring StartDeploymentAsync. - Alerts.razor: per-row DriverOperator-gated Ack/Shelve/Unshelve controls; operator name from AuthenticationState. Timed-shelve datetime UI deferred. - 5 TestKit tests (mediator-probe subscribed to alarm-commands) verifying each kind's mapping + reply; 56/56 ControlPlane tests green.
379 lines
18 KiB
C#
379 lines
18 KiB
C#
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
|
|
|
/// <summary>
|
|
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
|
|
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
|
|
/// Routed to via <see cref="Commons.Interfaces.IAdminOperationsClient"/> from anywhere in the cluster.
|
|
/// </summary>
|
|
public sealed class AdminOperationsActor : ReceiveActor
|
|
{
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
|
private readonly IActorRef _coordinator;
|
|
private readonly IReadOnlyDictionary<string, IDriverProbe> _probesByType;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
/// <summary>Creates actor props for the admin operations actor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
/// <returns>Props configured to create an AdminOperationsActor.</returns>
|
|
public static Props Props(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes) =>
|
|
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes));
|
|
|
|
/// <summary>Initializes a new instance of the AdminOperationsActor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
public AdminOperationsActor(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_coordinator = coordinator;
|
|
_probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase);
|
|
|
|
ReceiveAsync<StartDeployment>(HandleStartDeploymentAsync);
|
|
ReceiveAsync<TestDriverConnect>(HandleTestDriverConnectAsync);
|
|
ReceiveAsync<RestartDriver>(HandleRestartDriverAsync);
|
|
ReceiveAsync<ReconnectDriver>(HandleReconnectDriverAsync);
|
|
Receive<AcknowledgeAlarmCommand>(HandleAcknowledgeAlarm);
|
|
Receive<ShelveAlarmCommand>(HandleShelveAlarm);
|
|
}
|
|
|
|
/// <summary>
|
|
/// AdminUI Acknowledge path. Maps the control-plane command to a
|
|
/// <see cref="AlarmCommand"/> (<c>Operation = "Acknowledge"</c>) and publishes it onto the
|
|
/// cluster <c>alarm-commands</c> topic. The broadcast lands on every driver node's
|
|
/// <c>ScriptedAlarmHostActor</c>; only the node owning the alarm acts (ownership filter), so the
|
|
/// admin singleton needs no knowledge of placement. Synchronous — the publish is fire-and-forget
|
|
/// via the mediator, so there is no awaitable work and no DB write.
|
|
/// </summary>
|
|
private void HandleAcknowledgeAlarm(AcknowledgeAlarmCommand msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
var alarmCmd = new AlarmCommand(
|
|
AlarmId: msg.AlarmId,
|
|
Operation: "Acknowledge",
|
|
User: msg.User,
|
|
Comment: msg.Comment,
|
|
UnshelveAtUtc: null);
|
|
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd));
|
|
|
|
_log.Info("AdminOps: Acknowledge published for alarm {AlarmId} by {User}", msg.AlarmId, msg.User);
|
|
replyTo.Tell(new AcknowledgeAlarmResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: Acknowledge failed for alarm {AlarmId}", msg.AlarmId);
|
|
replyTo.Tell(new AcknowledgeAlarmResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// AdminUI Shelve / Unshelve path. Maps <see cref="ShelveAlarmCommand.Kind"/> to the matching
|
|
/// <see cref="AlarmCommand"/> operation (<c>OneShotShelve</c> / <c>TimedShelve</c> /
|
|
/// <c>Unshelve</c>), threading <see cref="ShelveAlarmCommand.UnshelveAtUtc"/> for the timed kind,
|
|
/// and publishes onto the cluster <c>alarm-commands</c> topic. Ownership filtering happens on the
|
|
/// owning node exactly as for Acknowledge.
|
|
/// </summary>
|
|
private void HandleShelveAlarm(ShelveAlarmCommand msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
var (operation, unshelveAt) = msg.Kind switch
|
|
{
|
|
ShelveKind.OneShot => ("OneShotShelve", (DateTime?)null),
|
|
ShelveKind.Timed => ("TimedShelve", msg.UnshelveAtUtc),
|
|
ShelveKind.Unshelve => ("Unshelve", (DateTime?)null),
|
|
_ => throw new ArgumentOutOfRangeException(nameof(msg), msg.Kind, "Unknown shelve kind."),
|
|
};
|
|
|
|
// TimedShelve requires an unshelve instant — the engine rejects it otherwise. Guard here so
|
|
// the AdminUI gets an immediate, attributable failure instead of a silently-dropped command.
|
|
if (msg.Kind == ShelveKind.Timed && unshelveAt is null)
|
|
{
|
|
replyTo.Tell(new ShelveAlarmResult(false, "TimedShelve requires UnshelveAtUtc.", msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
var alarmCmd = new AlarmCommand(
|
|
AlarmId: msg.AlarmId,
|
|
Operation: operation,
|
|
User: msg.User,
|
|
Comment: msg.Comment,
|
|
UnshelveAtUtc: unshelveAt);
|
|
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlarmCommandsTopic.Name, alarmCmd));
|
|
|
|
_log.Info("AdminOps: {Operation} published for alarm {AlarmId} by {User}", operation, msg.AlarmId, msg.User);
|
|
replyTo.Tell(new ShelveAlarmResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: Shelve ({Kind}) failed for alarm {AlarmId}", msg.Kind, msg.AlarmId);
|
|
replyTo.Tell(new ShelveAlarmResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleStartDeploymentAsync(StartDeployment msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
|
|
// Refuse if any deployment is already in flight — keeps the coordinator's state
|
|
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
|
|
var inflight = await db.Deployments
|
|
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
|
|
.Select(d => d.DeploymentId)
|
|
.FirstOrDefaultAsync();
|
|
if (inflight != Guid.Empty)
|
|
{
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.AnotherDeploymentInFlight,
|
|
DeploymentId: new DeploymentId(inflight),
|
|
RevisionHash: null,
|
|
Message: $"Deployment {inflight:N} is still in flight.",
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
// Full pre-seal gate: run the complete DraftValidator (one pass, all rules) and reject
|
|
// on ANY validation error. The config is now canonical — the company overlay loader emits
|
|
// canonical EquipmentIds and the seed is clean — so every rule (UNS segments, EquipmentId
|
|
// derivation, cross-cluster/namespace binding, driver-namespace compat, signal collisions,
|
|
// …) gates the deploy. A green build in this repo does not prove the config is valid; this
|
|
// is the last guard before a bad address space (or a non-derived EquipmentId) ships.
|
|
// NOTE: a BadDuplicateExternalIdentifier rejection from the reservation pre-flight can
|
|
// (rarely) be a false positive if an external-id reservation release has not yet been
|
|
// committed/visible when the snapshot is read — operators seeing a spurious one should
|
|
// check ExternalIdReservation state before re-submitting.
|
|
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
|
|
var errors = DraftValidator.Validate(draft);
|
|
if (errors.Count > 0)
|
|
{
|
|
var summary = string.Join("; ", errors.Select(e => $"[{e.Code}] {e.Message}"));
|
|
_log.Warning("StartDeployment rejected ({Count} validation error(s)): {Summary}",
|
|
errors.Count, summary);
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Rejected,
|
|
DeploymentId: null,
|
|
RevisionHash: null,
|
|
Message: summary,
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
// Warn-only compile-cost guardrail (NEVER blocks). Each genuinely-compiled (non-passthrough)
|
|
// script costs ~1.66 MiB RSS per node to materialize via Roslyn (measured post-A0; design doc
|
|
// 2026-06-07). Passthrough "mirror" scripts compile to ~nothing, so they are excluded; distinct
|
|
// sources are counted (the compile cache keys on source, so duplicates collapse to one unit).
|
|
// This only surfaces the estimate to the operator — the DraftValidator gate above is the hard
|
|
// reject; this advisory rides in the Accepted Message so the UI can show it.
|
|
// advisory is declared outside the inner try so the Accepted reply below can still read it even
|
|
// when the guardrail query throws (e.g. transient SQL). A failed estimate must NEVER block a
|
|
// valid deploy — the outer catch is reserved for genuine seal/save failures.
|
|
string? advisory = null;
|
|
try
|
|
{
|
|
const double PerScriptMiB = 1.66; // measured post-A0 per-script RSS (design doc 2026-06-07)
|
|
var scriptSources = await db.Scripts.AsNoTracking().Select(s => s.SourceCode).ToListAsync();
|
|
var compiled = scriptSources
|
|
.Where(src => !PassthroughScript.TryMatch(src, out _))
|
|
.Distinct(StringComparer.Ordinal)
|
|
.Count();
|
|
if (compiled > 0)
|
|
{
|
|
var estMiB = compiled * PerScriptMiB;
|
|
_log.Warning(
|
|
"StartDeployment: {Compiled} script(s) will compile (~{EstMiB:F0} MiB RSS per node); ensure node mem_limit covers it",
|
|
compiled, estMiB);
|
|
advisory = $"{compiled} script(s) will compile (~{estMiB:F0} MiB/node)";
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Guardrail is advisory-only — a failed estimate must never block a valid deploy.
|
|
_log.Warning(ex, "StartDeployment: script compile-cost estimate failed; advisory skipped");
|
|
advisory = null;
|
|
}
|
|
|
|
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
|
|
var deploymentId = DeploymentId.NewId();
|
|
var revHash = RevisionHash.Parse(artifact.RevisionHash);
|
|
|
|
db.Deployments.Add(new Deployment
|
|
{
|
|
DeploymentId = deploymentId.Value,
|
|
RevisionHash = artifact.RevisionHash,
|
|
Status = DeploymentStatus.Dispatching,
|
|
CreatedBy = msg.CreatedBy,
|
|
ArtifactBlob = artifact.Blob,
|
|
});
|
|
|
|
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "Deployment",
|
|
EntityId = deploymentId.Value,
|
|
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
|
|
EditedBy = msg.CreatedBy,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
|
|
await db.SaveChangesAsync();
|
|
|
|
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
|
|
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Accepted,
|
|
deploymentId,
|
|
revHash,
|
|
Message: advisory,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Rejected,
|
|
DeploymentId: null,
|
|
RevisionHash: null,
|
|
Message: ex.Message,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleTestDriverConnectAsync(TestDriverConnect msg)
|
|
{
|
|
var replyTo = Sender;
|
|
if (!_probesByType.TryGetValue(msg.DriverType, out var probe))
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"No probe registered for driver type '{msg.DriverType}'.",
|
|
null,
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60);
|
|
var timeout = TimeSpan.FromSeconds(clampedSec);
|
|
using var cts = new CancellationTokenSource(timeout);
|
|
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
try
|
|
{
|
|
var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token);
|
|
sw.Stop();
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
result.Ok,
|
|
result.Message,
|
|
result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"Probe timed out after {clampedSec}s.",
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "Probe for {DriverType} threw", msg.DriverType);
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
ex.Message,
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleRestartDriverAsync(RestartDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
|
|
// Only the host that owns the instance will act; others ignore it (id not found in _children).
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleReconnectDriverAsync(ReconnectDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
}
|