feat(controlplane): AdminOperationsActor + ConfigComposer + StartDeployment flow
This commit is contained in:
@@ -0,0 +1,108 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Event;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start
|
||||||
|
/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI.
|
||||||
|
/// Routed to via <see cref="Commons.Interfaces.IAdminOperationsClient"/> from anywhere in the cluster.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AdminOperationsActor : ReceiveActor
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||||
|
private readonly IActorRef _coordinator;
|
||||||
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
|
||||||
|
public static Props Props(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||||
|
IActorRef coordinator) =>
|
||||||
|
Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator));
|
||||||
|
|
||||||
|
public AdminOperationsActor(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||||
|
IActorRef coordinator)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
_coordinator = coordinator;
|
||||||
|
|
||||||
|
ReceiveAsync<StartDeployment>(HandleStartDeploymentAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task HandleStartDeploymentAsync(StartDeployment msg)
|
||||||
|
{
|
||||||
|
var replyTo = Sender;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var db = await _dbFactory.CreateDbContextAsync();
|
||||||
|
|
||||||
|
// Refuse if any deployment is already in flight — keeps the coordinator's state
|
||||||
|
// unambiguous. The UI is expected to wait for the in-flight one to seal or fail.
|
||||||
|
var inflight = await db.Deployments
|
||||||
|
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
|
||||||
|
.Select(d => d.DeploymentId)
|
||||||
|
.FirstOrDefaultAsync();
|
||||||
|
if (inflight != Guid.Empty)
|
||||||
|
{
|
||||||
|
replyTo.Tell(new StartDeploymentResult(
|
||||||
|
StartDeploymentOutcome.AnotherDeploymentInFlight,
|
||||||
|
DeploymentId: new DeploymentId(inflight),
|
||||||
|
RevisionHash: null,
|
||||||
|
Message: $"Deployment {inflight:N} is still in flight.",
|
||||||
|
msg.CorrelationId));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
|
||||||
|
var deploymentId = DeploymentId.NewId();
|
||||||
|
var revHash = RevisionHash.Parse(artifact.RevisionHash);
|
||||||
|
|
||||||
|
db.Deployments.Add(new Deployment
|
||||||
|
{
|
||||||
|
DeploymentId = deploymentId.Value,
|
||||||
|
RevisionHash = artifact.RevisionHash,
|
||||||
|
Status = DeploymentStatus.Dispatching,
|
||||||
|
CreatedBy = msg.CreatedBy,
|
||||||
|
ArtifactBlob = artifact.Blob,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Marker ConfigEdit row so the audit timeline shows the deployment snapshot.
|
||||||
|
db.ConfigEdits.Add(new ConfigEdit
|
||||||
|
{
|
||||||
|
EntityType = "Deployment",
|
||||||
|
EntityId = deploymentId.Value,
|
||||||
|
FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}",
|
||||||
|
EditedBy = msg.CreatedBy,
|
||||||
|
SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown",
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.SaveChangesAsync();
|
||||||
|
|
||||||
|
_coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId));
|
||||||
|
|
||||||
|
replyTo.Tell(new StartDeploymentResult(
|
||||||
|
StartDeploymentOutcome.Accepted,
|
||||||
|
deploymentId,
|
||||||
|
revHash,
|
||||||
|
Message: null,
|
||||||
|
msg.CorrelationId));
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy);
|
||||||
|
replyTo.Tell(new StartDeploymentResult(
|
||||||
|
StartDeploymentOutcome.Rejected,
|
||||||
|
DeploymentId: null,
|
||||||
|
RevisionHash: null,
|
||||||
|
Message: ex.Message,
|
||||||
|
msg.CorrelationId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,54 @@
|
|||||||
|
using System.Security.Cryptography;
|
||||||
|
using System.Text.Json;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pure snapshot composer: reads the current live-edit state from <see cref="OtOpcUaConfigDbContext"/>
|
||||||
|
/// and serialises it into a deterministic byte[] artifact + SHA-256 hex revision hash. Determinism
|
||||||
|
/// comes from sorting every collection by its natural key before serialising, so two snapshots over
|
||||||
|
/// the same DB state always produce the same hash regardless of EF row ordering.
|
||||||
|
/// </summary>
|
||||||
|
public static class ConfigComposer
|
||||||
|
{
|
||||||
|
public sealed record ConfigArtifact(byte[] Blob, string RevisionHash);
|
||||||
|
|
||||||
|
private static readonly JsonSerializerOptions JsonOptions = new()
|
||||||
|
{
|
||||||
|
WriteIndented = false,
|
||||||
|
DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.Never,
|
||||||
|
};
|
||||||
|
|
||||||
|
public static async Task<ConfigArtifact> SnapshotAndFlattenAsync(
|
||||||
|
OtOpcUaConfigDbContext db, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
var snapshot = new
|
||||||
|
{
|
||||||
|
Clusters = await db.ServerClusters.AsNoTracking().OrderBy(x => x.ClusterId).ToListAsync(ct),
|
||||||
|
Nodes = await db.ClusterNodes.AsNoTracking().OrderBy(x => x.NodeId).ToListAsync(ct),
|
||||||
|
DriverInstances = await db.DriverInstances.AsNoTracking().OrderBy(x => x.DriverInstanceId).ToListAsync(ct),
|
||||||
|
Devices = await db.Devices.AsNoTracking().OrderBy(x => x.DeviceId).ToListAsync(ct),
|
||||||
|
Equipment = await db.Equipment.AsNoTracking().OrderBy(x => x.EquipmentId).ToListAsync(ct),
|
||||||
|
Tags = await db.Tags.AsNoTracking().OrderBy(x => x.TagId).ToListAsync(ct),
|
||||||
|
PollGroups = await db.PollGroups.AsNoTracking().OrderBy(x => x.PollGroupId).ToListAsync(ct),
|
||||||
|
Namespaces = await db.Namespaces.AsNoTracking().OrderBy(x => x.NamespaceId).ToListAsync(ct),
|
||||||
|
UnsAreas = await db.UnsAreas.AsNoTracking().OrderBy(x => x.UnsAreaId).ToListAsync(ct),
|
||||||
|
UnsLines = await db.UnsLines.AsNoTracking().OrderBy(x => x.UnsLineId).ToListAsync(ct),
|
||||||
|
NodeAcls = await db.NodeAcls.AsNoTracking().OrderBy(x => x.NodeAclId).ToListAsync(ct),
|
||||||
|
Scripts = await db.Scripts.AsNoTracking().OrderBy(x => x.ScriptId).ToListAsync(ct),
|
||||||
|
VirtualTags = await db.VirtualTags.AsNoTracking().OrderBy(x => x.VirtualTagId).ToListAsync(ct),
|
||||||
|
ScriptedAlarms = await db.ScriptedAlarms.AsNoTracking().OrderBy(x => x.ScriptedAlarmId).ToListAsync(ct),
|
||||||
|
};
|
||||||
|
|
||||||
|
var blob = JsonSerializer.SerializeToUtf8Bytes(snapshot, JsonOptions);
|
||||||
|
var hash = Convert.ToHexStringLower(SHA256.HashData(blob));
|
||||||
|
return new ConfigArtifact(blob, hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Returns the SHA-256 hex digest of the supplied artifact bytes (lowercase, no prefix).</summary>
|
||||||
|
public static string HashOf(ReadOnlySpan<byte> blob) =>
|
||||||
|
Convert.ToHexStringLower(SHA256.HashData(blob));
|
||||||
|
}
|
||||||
@@ -0,0 +1,70 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
||||||
|
|
||||||
|
public sealed class AdminOperationsActorTests : ControlPlaneActorTestBase
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void StartDeployment_inserts_deployment_and_dispatches_to_coordinator()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var coordinator = CreateTestProbe("coord");
|
||||||
|
var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref));
|
||||||
|
|
||||||
|
actor.Tell(new StartDeployment("joe", CorrelationId.NewId()));
|
||||||
|
|
||||||
|
var dispatch = coordinator.ExpectMsg<DispatchDeployment>(TimeSpan.FromSeconds(3));
|
||||||
|
dispatch.DeploymentId.Value.ShouldNotBe(Guid.Empty);
|
||||||
|
dispatch.RevisionHash.Value.Length.ShouldBe(64);
|
||||||
|
|
||||||
|
var reply = ExpectMsg<StartDeploymentResult>(TimeSpan.FromSeconds(3));
|
||||||
|
reply.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
|
||||||
|
reply.DeploymentId.ShouldBe(dispatch.DeploymentId);
|
||||||
|
reply.RevisionHash.ShouldBe(dispatch.RevisionHash);
|
||||||
|
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
var row = db.Deployments.Single();
|
||||||
|
row.Status.ShouldBe(DeploymentStatus.Dispatching);
|
||||||
|
row.CreatedBy.ShouldBe("joe");
|
||||||
|
row.ArtifactBlob.Length.ShouldBeGreaterThan(0);
|
||||||
|
|
||||||
|
db.ConfigEdits.Count().ShouldBe(1);
|
||||||
|
db.ConfigEdits.Single().EntityType.ShouldBe("Deployment");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void StartDeployment_refuses_when_another_is_in_flight()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
// Seed an in-flight Deployment.
|
||||||
|
using (var db = dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.Deployments.Add(new Configuration.Entities.Deployment
|
||||||
|
{
|
||||||
|
RevisionHash = new string('a', 64),
|
||||||
|
Status = DeploymentStatus.Dispatching,
|
||||||
|
CreatedBy = "earlier",
|
||||||
|
});
|
||||||
|
db.SaveChanges();
|
||||||
|
}
|
||||||
|
|
||||||
|
var coordinator = CreateTestProbe("coord");
|
||||||
|
var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref));
|
||||||
|
|
||||||
|
actor.Tell(new StartDeployment("joe", CorrelationId.NewId()));
|
||||||
|
|
||||||
|
coordinator.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
var reply = ExpectMsg<StartDeploymentResult>(TimeSpan.FromSeconds(3));
|
||||||
|
reply.Outcome.ShouldBe(StartDeploymentOutcome.AnotherDeploymentInFlight);
|
||||||
|
reply.DeploymentId.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
||||||
|
|
||||||
|
public sealed class ConfigComposerTests : ControlPlaneActorTestBase
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Empty_database_produces_stable_hash()
|
||||||
|
{
|
||||||
|
var f = NewInMemoryDbFactory();
|
||||||
|
|
||||||
|
await using var db1 = f.CreateDbContext();
|
||||||
|
var a1 = await ConfigComposer.SnapshotAndFlattenAsync(db1);
|
||||||
|
|
||||||
|
await using var db2 = f.CreateDbContext();
|
||||||
|
var a2 = await ConfigComposer.SnapshotAndFlattenAsync(db2);
|
||||||
|
|
||||||
|
a1.RevisionHash.ShouldBe(a2.RevisionHash);
|
||||||
|
a1.Blob.ShouldBe(a2.Blob);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Same_rows_in_different_insert_orders_produce_same_hash()
|
||||||
|
{
|
||||||
|
var name = Guid.NewGuid().ToString("N");
|
||||||
|
var f = NewInMemoryDbFactory(name);
|
||||||
|
|
||||||
|
await using (var db = f.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-a"));
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-b"));
|
||||||
|
await db.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
var hashAB = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash;
|
||||||
|
|
||||||
|
// Fresh DB, same rows in reverse insertion order.
|
||||||
|
var f2 = NewInMemoryDbFactory();
|
||||||
|
await using (var db = f2.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-b"));
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-a"));
|
||||||
|
await db.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
var hashBA = (await ConfigComposer.SnapshotAndFlattenAsync(f2.CreateDbContext())).RevisionHash;
|
||||||
|
|
||||||
|
hashAB.ShouldBe(hashBA);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Different_data_produces_different_hash()
|
||||||
|
{
|
||||||
|
var f = NewInMemoryDbFactory();
|
||||||
|
await using (var db = f.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-a"));
|
||||||
|
await db.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
var hashA = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash;
|
||||||
|
|
||||||
|
await using (var db = f.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.ServerClusters.Add(NewCluster("cluster-b"));
|
||||||
|
await db.SaveChangesAsync();
|
||||||
|
}
|
||||||
|
var hashAB = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash;
|
||||||
|
|
||||||
|
hashAB.ShouldNotBe(hashA);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Hash_is_64_lowercase_hex_chars()
|
||||||
|
{
|
||||||
|
var f = NewInMemoryDbFactory();
|
||||||
|
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext());
|
||||||
|
artifact.RevisionHash.Length.ShouldBe(64);
|
||||||
|
artifact.RevisionHash.ShouldMatch("^[0-9a-f]{64}$");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static readonly DateTime FixedTimestamp = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
private static ServerCluster NewCluster(string id) => new()
|
||||||
|
{
|
||||||
|
ClusterId = id,
|
||||||
|
Name = id,
|
||||||
|
Enterprise = "ent",
|
||||||
|
Site = "site",
|
||||||
|
RedundancyMode = RedundancyMode.None,
|
||||||
|
CreatedBy = "test",
|
||||||
|
// Pin every timestamp so two harnesses produce byte-identical snapshots when the logical
|
||||||
|
// content matches. Production rows get real DateTime.UtcNow — divergence there is correct.
|
||||||
|
CreatedAt = FixedTimestamp,
|
||||||
|
};
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user