662f3f9f5c
v2-ci / build (push) Failing after 32s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
- Topic-name drift fix: DriverHealthChanged.TopicName and DriverControlTopic.Name now live on the message contracts in Commons. AkkaDriverHealthPublisher, DriverStatusSignalRBridge, DriverHostActor, and AdminOperationsActor all delegate to the single constant so a rename can't silently desynchronise publisher and subscriber. - DriverStatusPanel._opResultClearTimer switched from System.Timers.Timer to System.Threading.Timer + awaited DisposeAsync. Prevents an in-flight 8s clear-callback from invoking StateHasChanged on a component whose hub has already been released. - PublishHealthSnapshot deduplicates against the last published (state, lastSuccess, lastError, errorCount) fingerprint. The 30s heartbeat no longer floods the SignalR layer with identical Healthy snapshots — newly-joined clients still warm up via the snapshot store on JoinDriver.
235 lines
10 KiB
C#
235 lines
10 KiB
C#
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
|
|
|
/// <summary>
|
|
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
|
|
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
|
|
/// Routed to via <see cref="Commons.Interfaces.IAdminOperationsClient"/> from anywhere in the cluster.
|
|
/// </summary>
|
|
public sealed class AdminOperationsActor : ReceiveActor
|
|
{
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
|
private readonly IActorRef _coordinator;
|
|
private readonly IReadOnlyDictionary<string, IDriverProbe> _probesByType;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
/// <summary>Creates actor props for the admin operations actor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
/// <returns>Props configured to create an AdminOperationsActor.</returns>
|
|
public static Props Props(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes) =>
|
|
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator, probes));
|
|
|
|
/// <summary>Initializes a new instance of the AdminOperationsActor.</summary>
|
|
/// <param name="dbFactory">Factory for creating config database contexts.</param>
|
|
/// <param name="coordinator">Reference to the deployment coordinator actor.</param>
|
|
/// <param name="probes">Driver probes registered in DI; keyed by DriverType (case-insensitive).</param>
|
|
public AdminOperationsActor(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
IActorRef coordinator,
|
|
IEnumerable<IDriverProbe> probes)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_coordinator = coordinator;
|
|
_probesByType = probes.ToDictionary(p => p.DriverType, StringComparer.OrdinalIgnoreCase);
|
|
|
|
ReceiveAsync<StartDeployment>(HandleStartDeploymentAsync);
|
|
ReceiveAsync<TestDriverConnect>(HandleTestDriverConnectAsync);
|
|
ReceiveAsync<RestartDriver>(HandleRestartDriverAsync);
|
|
ReceiveAsync<ReconnectDriver>(HandleReconnectDriverAsync);
|
|
}
|
|
|
|
private async Task HandleStartDeploymentAsync(StartDeployment msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
|
|
// Refuse if any deployment is already in flight — keeps the coordinator's state
|
|
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
|
|
var inflight = await db.Deployments
|
|
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
|
|
.Select(d => d.DeploymentId)
|
|
.FirstOrDefaultAsync();
|
|
if (inflight != Guid.Empty)
|
|
{
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.AnotherDeploymentInFlight,
|
|
DeploymentId: new DeploymentId(inflight),
|
|
RevisionHash: null,
|
|
Message: $"Deployment {inflight:N} is still in flight.",
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
|
|
var deploymentId = DeploymentId.NewId();
|
|
var revHash = RevisionHash.Parse(artifact.RevisionHash);
|
|
|
|
db.Deployments.Add(new Deployment
|
|
{
|
|
DeploymentId = deploymentId.Value,
|
|
RevisionHash = artifact.RevisionHash,
|
|
Status = DeploymentStatus.Dispatching,
|
|
CreatedBy = msg.CreatedBy,
|
|
ArtifactBlob = artifact.Blob,
|
|
});
|
|
|
|
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "Deployment",
|
|
EntityId = deploymentId.Value,
|
|
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
|
|
EditedBy = msg.CreatedBy,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
|
|
await db.SaveChangesAsync();
|
|
|
|
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
|
|
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Accepted,
|
|
deploymentId,
|
|
revHash,
|
|
Message: null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
|
|
replyTo.Tell(new StartDeploymentResult(
|
|
StartDeploymentOutcome.Rejected,
|
|
DeploymentId: null,
|
|
RevisionHash: null,
|
|
Message: ex.Message,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleTestDriverConnectAsync(TestDriverConnect msg)
|
|
{
|
|
var replyTo = Sender;
|
|
if (!_probesByType.TryGetValue(msg.DriverType, out var probe))
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"No probe registered for driver type '{msg.DriverType}'.",
|
|
null,
|
|
msg.CorrelationId));
|
|
return;
|
|
}
|
|
|
|
var clampedSec = Math.Clamp(msg.TimeoutSeconds, 1, 60);
|
|
var timeout = TimeSpan.FromSeconds(clampedSec);
|
|
using var cts = new CancellationTokenSource(timeout);
|
|
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
try
|
|
{
|
|
var result = await probe.ProbeAsync(msg.ConfigJson, timeout, cts.Token);
|
|
sw.Stop();
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
result.Ok,
|
|
result.Message,
|
|
result.Ok ? sw.Elapsed.TotalMilliseconds : (double?)null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
$"Probe timed out after {clampedSec}s.",
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "Probe for {DriverType} threw", msg.DriverType);
|
|
replyTo.Tell(new TestDriverConnectResult(
|
|
false,
|
|
ex.Message,
|
|
null,
|
|
msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleRestartDriverAsync(RestartDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor on every node via the driver-control DPS topic.
|
|
// Only the host that owns the instance will act; others ignore it (id not found in _children).
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"restart\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: RestartDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new RestartDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: RestartDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new RestartDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
|
|
private async Task HandleReconnectDriverAsync(ReconnectDriver msg)
|
|
{
|
|
var replyTo = Sender;
|
|
try
|
|
{
|
|
// Broadcast to every DriverHostActor; only the one owning the instance reacts.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DriverControlTopic.Name, msg));
|
|
|
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
|
db.ConfigEdits.Add(new ConfigEdit
|
|
{
|
|
EntityType = "DriverInstance",
|
|
EntityId = Guid.TryParse(msg.DriverInstanceId, out var guid) ? guid : Guid.Empty,
|
|
FieldsJson = $"{{\"op\":\"reconnect\",\"driverInstanceId\":{System.Text.Json.JsonSerializer.Serialize(msg.DriverInstanceId)}}}",
|
|
EditedBy = msg.ActorByUserName,
|
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
|
});
|
|
await db.SaveChangesAsync();
|
|
|
|
_log.Info("AdminOps: ReconnectDriver dispatched for {DriverInstanceId} by {User}",
|
|
msg.DriverInstanceId, msg.ActorByUserName);
|
|
replyTo.Tell(new ReconnectDriverResult(true, null, msg.CorrelationId));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "AdminOps: ReconnectDriver failed for {DriverInstanceId}", msg.DriverInstanceId);
|
|
replyTo.Tell(new ReconnectDriverResult(false, ex.Message, msg.CorrelationId));
|
|
}
|
|
}
|
|
}
|