From ef683f50731c2d1210bdce05a12b74f3c3a83324 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 04:53:28 -0400 Subject: [PATCH] feat(controlplane): AdminOperationsActor + ConfigComposer + StartDeployment flow --- .../AdminOperations/AdminOperationsActor.cs | 108 ++++++++++++++++++ .../AdminOperations/ConfigComposer.cs | 54 +++++++++ .../AdminOperationsActorTests.cs | 70 ++++++++++++ .../ConfigComposerTests.cs | 98 ++++++++++++++++ 4 files changed, 330 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/ConfigComposer.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AdminOperationsActorTests.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigComposerTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs new file mode 100644 index 0000000..d102071 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs @@ -0,0 +1,108 @@ +using Akka.Actor; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; + +/// +/// Cluster-singleton admin operations actor. Owns the "snapshot the live-edit state and start +/// a deployment" workflow plus (eventually) all mutating live-edit ops invoked by the admin UI. +/// Routed to via from anywhere in the cluster. +/// +public sealed class AdminOperationsActor : ReceiveActor +{ + private readonly IDbContextFactory _dbFactory; + private readonly IActorRef _coordinator; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public static Props Props( + IDbContextFactory dbFactory, + IActorRef coordinator) => + Akka.Actor.Props.Create(() => new AdminOperationsActor(dbFactory, coordinator)); + + public AdminOperationsActor( + IDbContextFactory dbFactory, + IActorRef coordinator) + { + _dbFactory = dbFactory; + _coordinator = coordinator; + + ReceiveAsync(HandleStartDeploymentAsync); + } + + private async Task HandleStartDeploymentAsync(StartDeployment msg) + { + var replyTo = Sender; + try + { + await using var db = await _dbFactory.CreateDbContextAsync(); + + // Refuse if any deployment is already in flight — keeps the coordinator's state + // unambiguous. The UI is expected to wait for the in-flight one to seal or fail. + var inflight = await db.Deployments + .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) + .Select(d => d.DeploymentId) + .FirstOrDefaultAsync(); + if (inflight != Guid.Empty) + { + replyTo.Tell(new StartDeploymentResult( + StartDeploymentOutcome.AnotherDeploymentInFlight, + DeploymentId: new DeploymentId(inflight), + RevisionHash: null, + Message: $"Deployment {inflight:N} is still in flight.", + msg.CorrelationId)); + return; + } + + var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db); + var deploymentId = DeploymentId.NewId(); + var revHash = RevisionHash.Parse(artifact.RevisionHash); + + db.Deployments.Add(new Deployment + { + DeploymentId = deploymentId.Value, + RevisionHash = artifact.RevisionHash, + Status = DeploymentStatus.Dispatching, + CreatedBy = msg.CreatedBy, + ArtifactBlob = artifact.Blob, + }); + + // Marker ConfigEdit row so the audit timeline shows the deployment snapshot. + db.ConfigEdits.Add(new ConfigEdit + { + EntityType = "Deployment", + EntityId = deploymentId.Value, + FieldsJson = $"{{\"revisionHash\":\"{artifact.RevisionHash}\",\"sizeBytes\":{artifact.Blob.Length}}}", + EditedBy = msg.CreatedBy, + SourceNode = Akka.Cluster.Cluster.Get(Context.System).SelfAddress.Host ?? "unknown", + }); + + await db.SaveChangesAsync(); + + _coordinator.Tell(new DispatchDeployment(deploymentId, revHash, msg.CorrelationId)); + + replyTo.Tell(new StartDeploymentResult( + StartDeploymentOutcome.Accepted, + deploymentId, + revHash, + Message: null, + msg.CorrelationId)); + } + catch (Exception ex) + { + _log.Error(ex, "StartDeployment failed for {CreatedBy}", msg.CreatedBy); + replyTo.Tell(new StartDeploymentResult( + StartDeploymentOutcome.Rejected, + DeploymentId: null, + RevisionHash: null, + Message: ex.Message, + msg.CorrelationId)); + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/ConfigComposer.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/ConfigComposer.cs new file mode 100644 index 0000000..a66c75c --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/ConfigComposer.cs @@ -0,0 +1,54 @@ +using System.Security.Cryptography; +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; + +/// +/// Pure snapshot composer: reads the current live-edit state from +/// and serialises it into a deterministic byte[] artifact + SHA-256 hex revision hash. Determinism +/// comes from sorting every collection by its natural key before serialising, so two snapshots over +/// the same DB state always produce the same hash regardless of EF row ordering. +/// +public static class ConfigComposer +{ + public sealed record ConfigArtifact(byte[] Blob, string RevisionHash); + + private static readonly JsonSerializerOptions JsonOptions = new() + { + WriteIndented = false, + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.Never, + }; + + public static async Task SnapshotAndFlattenAsync( + OtOpcUaConfigDbContext db, CancellationToken ct = default) + { + var snapshot = new + { + Clusters = await db.ServerClusters.AsNoTracking().OrderBy(x => x.ClusterId).ToListAsync(ct), + Nodes = await db.ClusterNodes.AsNoTracking().OrderBy(x => x.NodeId).ToListAsync(ct), + DriverInstances = await db.DriverInstances.AsNoTracking().OrderBy(x => x.DriverInstanceId).ToListAsync(ct), + Devices = await db.Devices.AsNoTracking().OrderBy(x => x.DeviceId).ToListAsync(ct), + Equipment = await db.Equipment.AsNoTracking().OrderBy(x => x.EquipmentId).ToListAsync(ct), + Tags = await db.Tags.AsNoTracking().OrderBy(x => x.TagId).ToListAsync(ct), + PollGroups = await db.PollGroups.AsNoTracking().OrderBy(x => x.PollGroupId).ToListAsync(ct), + Namespaces = await db.Namespaces.AsNoTracking().OrderBy(x => x.NamespaceId).ToListAsync(ct), + UnsAreas = await db.UnsAreas.AsNoTracking().OrderBy(x => x.UnsAreaId).ToListAsync(ct), + UnsLines = await db.UnsLines.AsNoTracking().OrderBy(x => x.UnsLineId).ToListAsync(ct), + NodeAcls = await db.NodeAcls.AsNoTracking().OrderBy(x => x.NodeAclId).ToListAsync(ct), + Scripts = await db.Scripts.AsNoTracking().OrderBy(x => x.ScriptId).ToListAsync(ct), + VirtualTags = await db.VirtualTags.AsNoTracking().OrderBy(x => x.VirtualTagId).ToListAsync(ct), + ScriptedAlarms = await db.ScriptedAlarms.AsNoTracking().OrderBy(x => x.ScriptedAlarmId).ToListAsync(ct), + }; + + var blob = JsonSerializer.SerializeToUtf8Bytes(snapshot, JsonOptions); + var hash = Convert.ToHexStringLower(SHA256.HashData(blob)); + return new ConfigArtifact(blob, hash); + } + + /// Returns the SHA-256 hex digest of the supplied artifact bytes (lowercase, no prefix). + public static string HashOf(ReadOnlySpan blob) => + Convert.ToHexStringLower(SHA256.HashData(blob)); +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AdminOperationsActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AdminOperationsActorTests.cs new file mode 100644 index 0000000..1b7b32b --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AdminOperationsActorTests.cs @@ -0,0 +1,70 @@ +using Akka.Actor; +using Akka.TestKit; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class AdminOperationsActorTests : ControlPlaneActorTestBase +{ + [Fact] + public void StartDeployment_inserts_deployment_and_dispatches_to_coordinator() + { + var dbFactory = NewInMemoryDbFactory(); + var coordinator = CreateTestProbe("coord"); + var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref)); + + actor.Tell(new StartDeployment("joe", CorrelationId.NewId())); + + var dispatch = coordinator.ExpectMsg(TimeSpan.FromSeconds(3)); + dispatch.DeploymentId.Value.ShouldNotBe(Guid.Empty); + dispatch.RevisionHash.Value.Length.ShouldBe(64); + + var reply = ExpectMsg(TimeSpan.FromSeconds(3)); + reply.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); + reply.DeploymentId.ShouldBe(dispatch.DeploymentId); + reply.RevisionHash.ShouldBe(dispatch.RevisionHash); + + using var db = dbFactory.CreateDbContext(); + var row = db.Deployments.Single(); + row.Status.ShouldBe(DeploymentStatus.Dispatching); + row.CreatedBy.ShouldBe("joe"); + row.ArtifactBlob.Length.ShouldBeGreaterThan(0); + + db.ConfigEdits.Count().ShouldBe(1); + db.ConfigEdits.Single().EntityType.ShouldBe("Deployment"); + } + + [Fact] + public void StartDeployment_refuses_when_another_is_in_flight() + { + var dbFactory = NewInMemoryDbFactory(); + // Seed an in-flight Deployment. + using (var db = dbFactory.CreateDbContext()) + { + db.Deployments.Add(new Configuration.Entities.Deployment + { + RevisionHash = new string('a', 64), + Status = DeploymentStatus.Dispatching, + CreatedBy = "earlier", + }); + db.SaveChanges(); + } + + var coordinator = CreateTestProbe("coord"); + var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref)); + + actor.Tell(new StartDeployment("joe", CorrelationId.NewId())); + + coordinator.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + var reply = ExpectMsg(TimeSpan.FromSeconds(3)); + reply.Outcome.ShouldBe(StartDeploymentOutcome.AnotherDeploymentInFlight); + reply.DeploymentId.ShouldNotBeNull(); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigComposerTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigComposerTests.cs new file mode 100644 index 0000000..2eeb37d --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigComposerTests.cs @@ -0,0 +1,98 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class ConfigComposerTests : ControlPlaneActorTestBase +{ + [Fact] + public async Task Empty_database_produces_stable_hash() + { + var f = NewInMemoryDbFactory(); + + await using var db1 = f.CreateDbContext(); + var a1 = await ConfigComposer.SnapshotAndFlattenAsync(db1); + + await using var db2 = f.CreateDbContext(); + var a2 = await ConfigComposer.SnapshotAndFlattenAsync(db2); + + a1.RevisionHash.ShouldBe(a2.RevisionHash); + a1.Blob.ShouldBe(a2.Blob); + } + + [Fact] + public async Task Same_rows_in_different_insert_orders_produce_same_hash() + { + var name = Guid.NewGuid().ToString("N"); + var f = NewInMemoryDbFactory(name); + + await using (var db = f.CreateDbContext()) + { + db.ServerClusters.Add(NewCluster("cluster-a")); + db.ServerClusters.Add(NewCluster("cluster-b")); + await db.SaveChangesAsync(); + } + var hashAB = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash; + + // Fresh DB, same rows in reverse insertion order. + var f2 = NewInMemoryDbFactory(); + await using (var db = f2.CreateDbContext()) + { + db.ServerClusters.Add(NewCluster("cluster-b")); + db.ServerClusters.Add(NewCluster("cluster-a")); + await db.SaveChangesAsync(); + } + var hashBA = (await ConfigComposer.SnapshotAndFlattenAsync(f2.CreateDbContext())).RevisionHash; + + hashAB.ShouldBe(hashBA); + } + + [Fact] + public async Task Different_data_produces_different_hash() + { + var f = NewInMemoryDbFactory(); + await using (var db = f.CreateDbContext()) + { + db.ServerClusters.Add(NewCluster("cluster-a")); + await db.SaveChangesAsync(); + } + var hashA = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash; + + await using (var db = f.CreateDbContext()) + { + db.ServerClusters.Add(NewCluster("cluster-b")); + await db.SaveChangesAsync(); + } + var hashAB = (await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext())).RevisionHash; + + hashAB.ShouldNotBe(hashA); + } + + [Fact] + public async Task Hash_is_64_lowercase_hex_chars() + { + var f = NewInMemoryDbFactory(); + var artifact = await ConfigComposer.SnapshotAndFlattenAsync(f.CreateDbContext()); + artifact.RevisionHash.Length.ShouldBe(64); + artifact.RevisionHash.ShouldMatch("^[0-9a-f]{64}$"); + } + + private static readonly DateTime FixedTimestamp = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + private static ServerCluster NewCluster(string id) => new() + { + ClusterId = id, + Name = id, + Enterprise = "ent", + Site = "site", + RedundancyMode = RedundancyMode.None, + CreatedBy = "test", + // Pin every timestamp so two harnesses produce byte-identical snapshots when the logical + // content matches. Production rows get real DateTime.UtcNow — divergence there is correct. + CreatedAt = FixedTimestamp, + }; +}