Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs
T
Joseph Doherty 662f3f9f5c
v2-ci / build (push) Failing after 32s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
refactor(driver-pages): address Phase 6/8 deep-review findings
- Topic-name drift fix: DriverHealthChanged.TopicName and
  DriverControlTopic.Name now live on the message contracts in
  Commons. AkkaDriverHealthPublisher, DriverStatusSignalRBridge,
  DriverHostActor, and AdminOperationsActor all delegate to the
  single constant so a rename can't silently desynchronise
  publisher and subscriber.
- DriverStatusPanel._opResultClearTimer switched from
  System.Timers.Timer to System.Threading.Timer + awaited
  DisposeAsync. Prevents an in-flight 8s clear-callback from
  invoking StateHasChanged on a component whose hub has already
  been released.
- PublishHealthSnapshot deduplicates against the last published
  (state, lastSuccess, lastError, errorCount) fingerprint. The
  30s heartbeat no longer floods the SignalR layer with identical
  Healthy snapshots — newly-joined clients still warm up via the
  snapshot store on JoinDriver.
2026-05-28 11:52:20 -04:00

235 lines
10 KiB
C#

using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
/// <summary>
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
/// Routed to via <see cref="Commons.Interfaces.IAdminOperationsClient"/> from anywhere in the cluster.
/// </summary>
public sealed class AdminOperationsActor : ReceiveActor
{
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly IActorRef _coordinator;
private readonly IReadOnlyDictionary<string, IDriverProbe> _probesByType;
private readonly ILoggingAdapter _log = Context.GetLogger();
/// <summary>Creates actor props for the admin operations actor.</summary>
/// <param name="dbFactory">Factory for creating config database contexts.</param>
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
/// <returns>Props configured to create an AdminOperationsActor.</returns>
public static Props Props(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
IActorRef coordinator,
IEnumerable<IDriverProbe> probes) =>
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes));
/// <summary>Initializes a new instance of the AdminOperationsActor.</summary>
/// <param name="dbFactory">Factory for creating config database contexts.</param>
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
public AdminOperationsActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
IActorRef coordinator,
IEnumerable<IDriverProbe> probes)
{
_dbFactory = dbFactory;
_coordinator = coordinator;
_probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase);
ReceiveAsync<StartDeployment>(HandleStartDeploymentAsync);
ReceiveAsync<TestDriverConnect>(HandleTestDriverConnectAsync);
ReceiveAsync<RestartDriver>(HandleRestartDriverAsync);
ReceiveAsync<ReconnectDriver>(HandleReconnectDriverAsync);
}
private async Task HandleStartDeploymentAsync(StartDeployment msg)
{
var replyTo = Sender;
try
{
await using var db = await _dbFactory.CreateDbContextAsync();
// Refuse if any deployment is already in flight — keeps the coordinator's state
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
var inflight = await db.Deployments
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
.Select(d => d.DeploymentId)
.FirstOrDefaultAsync();
if (inflight != Guid.Empty)
{
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.AnotherDeploymentInFlight,
DeploymentId: new DeploymentId(inflight),
RevisionHash: null,
Message: $"Deployment {inflight:N} is still in flight.",
msg.CorrelationId));
return;
}
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
var deploymentId = DeploymentId.NewId();
var revHash = RevisionHash.Parse(artifact.RevisionHash);
db.Deployments.Add(new Deployment
{
DeploymentId = deploymentId.Value,
RevisionHash = artifact.RevisionHash,
Status = DeploymentStatus.Dispatching,
CreatedBy = msg.CreatedBy,
ArtifactBlob = artifact.Blob,
});
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "Deployment",
EntityId = deploymentId.Value,
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
EditedBy = msg.CreatedBy,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Accepted,
deploymentId,
revHash,
Message: null,
msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Rejected,
DeploymentId: null,
RevisionHash: null,
Message: ex.Message,
msg.CorrelationId));
}
}
private async Task HandleTestDriverConnectAsync(TestDriverConnect msg)
{
var replyTo = Sender;
if (!_probesByType.TryGetValue(msg.DriverType, out var probe))
{
replyTo.Tell(new TestDriverConnectResult(
false,
$"No probe registered for driver type '{msg.DriverType}'.",
null,
msg.CorrelationId));
return;
}
var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60);
var timeout = TimeSpan.FromSeconds(clampedSec);
using var cts = new CancellationTokenSource(timeout);
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token);
sw.Stop();
replyTo.Tell(new TestDriverConnectResult(
result.Ok,
result.Message,
result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null,
msg.CorrelationId));
}
catch (OperationCanceledException)
{
replyTo.Tell(new TestDriverConnectResult(
false,
$"Probe timed out after {clampedSec}s.",
null,
msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "Probe for {DriverType} threw", msg.DriverType);
replyTo.Tell(new TestDriverConnectResult(
false,
ex.Message,
null,
msg.CorrelationId));
}
}
private async Task HandleRestartDriverAsync(RestartDriver msg)
{
var replyTo = Sender;
try
{
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
// Only the host that owns the instance will act; others ignore it (id not found in _children).
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
await using var db = await _dbFactory.CreateDbContextAsync();
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "DriverInstance",
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
EditedBy = msg.ActorByUserName,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}",
msg.DriverInstanceId, msg.ActorByUserName);
replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId));
}
}
private async Task HandleReconnectDriverAsync(ReconnectDriver msg)
{
var replyTo = Sender;
try
{
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
await using var db = await _dbFactory.CreateDbContextAsync();
db.ConfigEdits.Add(new ConfigEdit
{
EntityType = "DriverInstance",
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
EditedBy = msg.ActorByUserName,
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
});
await db.SaveChangesAsync();
_log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}",
msg.DriverInstanceId, msg.ActorByUserName);
replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId));
}
catch (Exception ex)
{
_log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId));
}
}
}