From 7e22e2250c7602421898359286e485ccaab9b69b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 09:55:11 -0400 Subject: [PATCH] =?UTF-8?q?feat(runtime):=20#109=20OpcUaPublishActor=20?= =?UTF-8?q?=E2=80=94=20load=20artifact,=20compose,=20plan-diff,=20apply?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the loop between F10b (SDK NodeManager) and F14 (Phase7Plan + Phase7Applier). DriverHostActor's successful apply now triggers a RebuildAddressSpace on the publish actor, which loads the latest deployment artifact + walks composer → planner → applier through the sink. The OPC UA address space tracks the deployed composition. DeploymentArtifact: - New ParseComposition(blob) → Phase7CompositionResult that decodes Equipment + DriverInstance + ScriptedAlarm arrays into the projection records Phase7Planner consumes. Pascal-case property names mirror ConfigComposer.SnapshotAndFlattenAsync's output. - Each entity reader is tolerant: missing-id rows are dropped, natural-key sort matches Phase7Composer's contract. OpcUaPublishActor: - New Props params: dbFactory + applier. When wired, RebuildAddressSpace does: 1. LoadLatestArtifact (most recent Sealed Deployment.ArtifactBlob) 2. ParseComposition → Phase7CompositionResult 3. Phase7Planner.Compute(lastApplied, next) → Phase7Plan 4. Empty plan ⇒ no-op (deploy of unchanged composition is benign) 5. applier.Apply(plan) drives sink.RebuildAddressSpace + WriteAlarmState for removed nodes 6. lastApplied = next so the next rebuild diffs forward - Without dbFactory/applier wiring, falls back to raw sink.RebuildAddressSpace — the dev/Mac path before #108 binds prod. DriverHostActor: - New Props param opcUaPublishActor (IActorRef?). After successful ApplyAndAck (status Applied, ACK sent), tells the publish actor RebuildAddressSpace with the same correlation id so the audit trail threads through. Null publish actor ⇒ no trigger (admin-only nodes). Tests: Runtime 63 -> 69 (+6): - ParseComposition reads Equipment/Driver/Alarm sorted by natural key - ParseComposition returns empty for empty blob - Rebuild with dbFactory + sealed deployment artifact triggers exactly one sink.Rebuild call (Equipment topology added) - Rebuild with no artifact is idempotent no-op - Second rebuild with same composition is empty-plan no-op - Rebuild without dbFactory falls back to raw sink.Rebuild (legacy path) All 6 v2 test suites green: 173 tests passing. Closes #109. Engine-wiring data flow is now end-to-end through: Deploy → DriverHostActor.ApplyAndAck → driver spawn + ACK + RebuildAddressSpace → OpcUaPublishActor → Phase7Applier → SDK NodeManager → subscribed OPC UA clients see the change. --- .../Drivers/DeploymentArtifact.cs | 95 ++++++++++++ .../Drivers/DriverHostActor.cs | 14 +- .../OpcUa/OpcUaPublishActor.cs | 90 +++++++++-- .../Drivers/DeploymentArtifactTests.cs | 42 +++++ .../OpcUa/OpcUaPublishActorRebuildTests.cs | 144 ++++++++++++++++++ 5 files changed, 372 insertions(+), 13 deletions(-) create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs index 853e7c7..8cfecd8 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; @@ -75,4 +76,98 @@ public static class DeploymentArtifact Enabled: enabled, DriverConfig: config ?? "{}"); } + + /// + /// Parse the artifact into the projected used by + /// Phase7Planner + Phase7Applier. Returns an empty composition for empty/ + /// malformed blobs so callers can treat parse failure as a no-op deploy. + /// + /// The artifact JSON is produced by ConfigComposer.SnapshotAndFlattenAsync in the + /// ControlPlane — its Pascal-case property names match the EF entities. We only need a + /// subset of fields per entity class to drive the address-space rebuild on driver-role + /// nodes. + /// + public static Phase7CompositionResult ParseComposition(ReadOnlySpan blob) + { + if (blob.IsEmpty) + { + return new Phase7CompositionResult( + Array.Empty(), + Array.Empty(), + Array.Empty()); + } + + try + { + using var doc = JsonDocument.Parse(blob.ToArray()); + var root = doc.RootElement; + + var equipment = ReadArray(root, "Equipment", ReadEquipmentNode); + var drivers = ReadArray(root, "DriverInstances", ReadDriverPlan); + var alarms = ReadArray(root, "ScriptedAlarms", ReadAlarmPlan); + + return new Phase7CompositionResult(equipment, drivers, alarms); + } + catch (JsonException) + { + return new Phase7CompositionResult( + Array.Empty(), + Array.Empty(), + Array.Empty()); + } + } + + private static IReadOnlyList ReadArray(JsonElement root, string propertyName, Func reader) + where T : class + { + if (!root.TryGetProperty(propertyName, out var arr) || arr.ValueKind != JsonValueKind.Array) + return Array.Empty(); + + var result = new List(arr.GetArrayLength()); + foreach (var el in arr.EnumerateArray()) + { + if (el.ValueKind != JsonValueKind.Object) continue; + var item = reader(el); + if (item is not null) result.Add(item); + } + // Match Phase7Composer's natural-key sort so plan diffs are deterministic across + // artifact-decode + composer-compose passes. + return result.OrderBy(IdentityOf, StringComparer.Ordinal).ToList(); + } + + private static string IdentityOf(T item) where T : class => item switch + { + EquipmentNode e => e.EquipmentId, + DriverInstancePlan d => d.DriverInstanceId, + ScriptedAlarmPlan a => a.ScriptedAlarmId, + _ => string.Empty, + }; + + private static EquipmentNode? ReadEquipmentNode(JsonElement el) + { + var id = el.TryGetProperty("EquipmentId", out var idEl) ? idEl.GetString() : null; + var displayName = el.TryGetProperty("MachineCode", out var mcEl) ? mcEl.GetString() : null; + var lineId = el.TryGetProperty("UnsLineId", out var lineEl) ? lineEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id)) return null; + return new EquipmentNode(id!, displayName ?? id!, lineId ?? string.Empty); + } + + private static DriverInstancePlan? ReadDriverPlan(JsonElement el) + { + var id = el.TryGetProperty("DriverInstanceId", out var idEl) ? idEl.GetString() : null; + var type = el.TryGetProperty("DriverType", out var typeEl) ? typeEl.GetString() : null; + var config = el.TryGetProperty("DriverConfig", out var cfgEl) ? cfgEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id) || string.IsNullOrWhiteSpace(type)) return null; + return new DriverInstancePlan(id!, type!, config ?? "{}"); + } + + private static ScriptedAlarmPlan? ReadAlarmPlan(JsonElement el) + { + var id = el.TryGetProperty("ScriptedAlarmId", out var idEl) ? idEl.GetString() : null; + var equipmentId = el.TryGetProperty("EquipmentId", out var eqEl) ? eqEl.GetString() : null; + var script = el.TryGetProperty("PredicateScriptId", out var scEl) ? scEl.GetString() : null; + var template = el.TryGetProperty("MessageTemplate", out var tmEl) ? tmEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id)) return null; + return new ScriptedAlarmPlan(id!, equipmentId ?? string.Empty, script ?? string.Empty, template ?? string.Empty); + } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 28af2ae..1934ccd 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -42,6 +42,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly IDriverFactory _driverFactory; private readonly IReadOnlySet _localRoles; private readonly IActorRef? _dependencyMux; + private readonly IActorRef? _opcUaPublishActor; private readonly ILoggingAdapter _log = Context.GetLogger(); private RevisionHash? _currentRevision; @@ -65,9 +66,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IActorRef? coordinator = null, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, - IActorRef? dependencyMux = null) => + IActorRef? dependencyMux = null, + IActorRef? opcUaPublishActor = null) => Akka.Actor.Props.Create(() => new DriverHostActor( - dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux)); + dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor)); public DriverHostActor( IDbContextFactory dbFactory, @@ -75,7 +77,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IActorRef? coordinator, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, - IActorRef? dependencyMux = null) + IActorRef? dependencyMux = null, + IActorRef? opcUaPublishActor = null) { _dbFactory = dbFactory; _localNode = localNode; @@ -83,6 +86,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _driverFactory = driverFactory ?? NullDriverFactory.Instance; _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); _dependencyMux = dependencyMux; + _opcUaPublishActor = opcUaPublishActor; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); @@ -244,6 +248,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _currentRevision = revision; UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null); SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation); + // Trigger the OPC UA address-space rebuild so the local SDK reflects the new + // composition. The publish actor handles the load-compose-diff-apply pipeline; we + // just forward the same correlation id so the audit trail joins up. + _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation)); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 05f8f99..2c24826 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -1,9 +1,13 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; @@ -33,26 +37,38 @@ public sealed class OpcUaPublishActor : ReceiveActor private readonly IServiceLevelPublisher _serviceLevel; private readonly bool _subscribeRedundancyTopic; private readonly NodeId? _localNode; + private readonly IDbContextFactory? _dbFactory; + private readonly Phase7Applier? _applier; private readonly ILoggingAdapter _log = Context.GetLogger(); private int _writes; private byte _lastServiceLevel; + private Phase7CompositionResult _lastApplied = new( + Array.Empty(), + Array.Empty(), + Array.Empty()); public int WriteCount => _writes; public byte LastServiceLevel => _lastServiceLevel; /// Production Props — pins the OPC UA dispatcher + subscribes to the /// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel - /// publish path. + /// publish path. When + are supplied, + /// reads the latest deployment artifact + drives the + /// applier through the sink. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, - NodeId? localNode = null) => + NodeId? localNode = null, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: true, - localNode)).WithDispatcher(DispatcherId); + localNode, + dbFactory, + applier)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. @@ -60,23 +76,31 @@ public sealed class OpcUaPublishActor : ReceiveActor IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, bool subscribeRedundancyTopic = false, - NodeId? localNode = null) => + NodeId? localNode = null, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic, - localNode)); + localNode, + dbFactory, + applier)); public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, bool subscribeRedundancyTopic, - NodeId? localNode) + NodeId? localNode, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) { _sink = sink; _serviceLevel = serviceLevel; _subscribeRedundancyTopic = subscribeRedundancyTopic; _localNode = localNode; + _dbFactory = dbFactory; + _applier = applier; Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); @@ -122,15 +146,61 @@ public sealed class OpcUaPublishActor : ReceiveActor private void HandleRebuild(RebuildAddressSpace msg) { + // Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against + // the latest deployment artifact. Without them, fall back to a raw sink rebuild — the + // F10b/dev path before the integration completes. + if (_dbFactory is null || _applier is null) + { + try { _sink.RebuildAddressSpace(); } + catch (Exception ex) + { + _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", + msg.Correlation); + } + return; + } + try { - _sink.RebuildAddressSpace(); - _log.Info("OpcUaPublish: address-space rebuilt (correlation={Correlation})", msg.Correlation); + var artifact = LoadLatestArtifact(); + var composition = DeploymentArtifact.ParseComposition(artifact); + var plan = Phase7Planner.Compute(_lastApplied, composition); + + if (plan.IsEmpty) + { + _log.Debug("OpcUaPublish: rebuild requested but plan is empty (correlation={Correlation})", + msg.Correlation); + return; + } + + var outcome = _applier.Apply(plan); + _lastApplied = composition; + _log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})", + msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled); } catch (Exception ex) { - _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", - msg.Correlation); + _log.Error(ex, "OpcUaPublish: rebuild pipeline threw (correlation={Correlation})", msg.Correlation); + } + } + + /// Read the most recent Sealed deployment's artifact blob from ConfigDb. + /// Empty array on any failure — the parser treats empty blob as "no composition". + private byte[] LoadLatestArtifact() + { + try + { + using var db = _dbFactory!.CreateDbContext(); + return db.Deployments.AsNoTracking() + .Where(d => d.Status == Configuration.Enums.DeploymentStatus.Sealed) + .OrderByDescending(d => d.SealedAtUtc) + .Select(d => d.ArtifactBlob) + .FirstOrDefault() ?? Array.Empty(); + } + catch (Exception ex) + { + _log.Warning(ex, "OpcUaPublish: failed to load latest deployment artifact; rebuild becomes no-op"); + return Array.Empty(); } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs index a4bbd36..6c715cb 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs @@ -69,6 +69,48 @@ public sealed class DeploymentArtifactTests specs[1].Enabled.ShouldBeFalse(); } + [Fact] + public void ParseComposition_returns_empty_for_empty_blob() + { + var c = DeploymentArtifact.ParseComposition(ReadOnlySpan.Empty); + c.EquipmentNodes.ShouldBeEmpty(); + c.DriverInstancePlans.ShouldBeEmpty(); + c.ScriptedAlarmPlans.ShouldBeEmpty(); + } + + [Fact] + public void ParseComposition_reads_all_three_entity_classes_sorted_by_id() + { + var blob = JsonSerializer.SerializeToUtf8Bytes(new + { + Equipment = new[] + { + new { EquipmentId = "eq-z", MachineCode = "Z", UnsLineId = "line-1" }, + new { EquipmentId = "eq-a", MachineCode = "A", UnsLineId = "line-1" }, + }, + DriverInstances = new[] + { + new { DriverInstanceId = "drv-1", DriverType = "Modbus", DriverConfig = "{}" }, + }, + ScriptedAlarms = new[] + { + new + { + ScriptedAlarmId = "alarm-1", + EquipmentId = "eq-a", + PredicateScriptId = "script-1", + MessageTemplate = "high", + }, + }, + }); + + var c = DeploymentArtifact.ParseComposition(blob); + + c.EquipmentNodes.Select(e => e.EquipmentId).ShouldBe(new[] { "eq-a", "eq-z" }); + c.DriverInstancePlans.Single().DriverInstanceId.ShouldBe("drv-1"); + c.ScriptedAlarmPlans.Single().ScriptedAlarmId.ShouldBe("alarm-1"); + } + [Fact] public void Spec_missing_required_fields_is_dropped() { diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs new file mode 100644 index 0000000..b87b51e --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs @@ -0,0 +1,144 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa; + +public sealed class OpcUaPublishActorRebuildTests : RuntimeActorTestBase +{ + [Fact] + public void RebuildAddressSpace_with_dbFactory_loads_artifact_composes_and_applies() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + + SeedDeployment(db, equipmentIds: new[] { "eq-1", "eq-2" }, driverIds: new[] { "drv-1" }); + + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + dbFactory: db, + applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + + AwaitAssert(() => + { + // Add path: Equipment + Driver + Alarm — but only Equipment/Alarm topology triggers + // RebuildAddressSpace. With 2 new equipment we expect one Rebuild call. + sink.RebuildCalls.ShouldBe(1); + }, duration: TimeSpan.FromSeconds(2)); + } + + [Fact] + public void Rebuild_with_no_artifact_is_idempotent_no_op() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + + // No deployment seeded — LoadLatestArtifact returns empty blob. + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + dbFactory: db, + applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + Thread.Sleep(200); + + sink.RebuildCalls.ShouldBe(0); + } + + [Fact] + public void Second_rebuild_with_same_artifact_is_empty_plan_no_op() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + SeedDeployment(db, equipmentIds: new[] { "eq-1" }, driverIds: Array.Empty()); + + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, dbFactory: db, applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromSeconds(2)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + Thread.Sleep(200); + // Same composition ⇒ plan IsEmpty ⇒ applier not called again. + sink.RebuildCalls.ShouldBe(1); + } + + [Fact] + public void Rebuild_without_dbFactory_falls_back_to_raw_sink_rebuild() + { + // Pre-#109 behavior: no dbFactory wired ⇒ RebuildAddressSpace calls _sink.RebuildAddressSpace + // directly. The dev/Mac path before the full integration is bound. + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + + AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500)); + } + + private static void SeedDeployment( + IDbContextFactory dbFactory, + string[] equipmentIds, + string[] driverIds) + { + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Equipment = equipmentIds.Select(id => new + { + EquipmentId = id, + MachineCode = id.ToUpperInvariant(), + UnsLineId = "line-1", + Name = id, + }).ToArray(), + DriverInstances = driverIds.Select(id => new + { + DriverInstanceId = id, + DriverType = "Modbus", + Enabled = true, + DriverConfig = "{}", + }).ToArray(), + ScriptedAlarms = Array.Empty(), + }); + + using var ctx = dbFactory.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = Guid.NewGuid(), + RevisionHash = new string('a', 64), + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public ConcurrentQueue Calls { get; } = new(); + public int RebuildCalls; + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) + => Calls.Enqueue($"WV:{nodeId}"); + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) + => Calls.Enqueue($"WA:{alarmNodeId}"); + public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls); + } +}