diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs index 853e7c7..8cfecd8 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DeploymentArtifact.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; @@ -75,4 +76,98 @@ public static class DeploymentArtifact Enabled: enabled, DriverConfig: config ?? "{}"); } + + /// + /// Parse the artifact into the projected used by + /// Phase7Planner + Phase7Applier. Returns an empty composition for empty/ + /// malformed blobs so callers can treat parse failure as a no-op deploy. + /// + /// The artifact JSON is produced by ConfigComposer.SnapshotAndFlattenAsync in the + /// ControlPlane — its Pascal-case property names match the EF entities. We only need a + /// subset of fields per entity class to drive the address-space rebuild on driver-role + /// nodes. + /// + public static Phase7CompositionResult ParseComposition(ReadOnlySpan blob) + { + if (blob.IsEmpty) + { + return new Phase7CompositionResult( + Array.Empty(), + Array.Empty(), + Array.Empty()); + } + + try + { + using var doc = JsonDocument.Parse(blob.ToArray()); + var root = doc.RootElement; + + var equipment = ReadArray(root, "Equipment", ReadEquipmentNode); + var drivers = ReadArray(root, "DriverInstances", ReadDriverPlan); + var alarms = ReadArray(root, "ScriptedAlarms", ReadAlarmPlan); + + return new Phase7CompositionResult(equipment, drivers, alarms); + } + catch (JsonException) + { + return new Phase7CompositionResult( + Array.Empty(), + Array.Empty(), + Array.Empty()); + } + } + + private static IReadOnlyList ReadArray(JsonElement root, string propertyName, Func reader) + where T : class + { + if (!root.TryGetProperty(propertyName, out var arr) || arr.ValueKind != JsonValueKind.Array) + return Array.Empty(); + + var result = new List(arr.GetArrayLength()); + foreach (var el in arr.EnumerateArray()) + { + if (el.ValueKind != JsonValueKind.Object) continue; + var item = reader(el); + if (item is not null) result.Add(item); + } + // Match Phase7Composer's natural-key sort so plan diffs are deterministic across + // artifact-decode + composer-compose passes. + return result.OrderBy(IdentityOf, StringComparer.Ordinal).ToList(); + } + + private static string IdentityOf(T item) where T : class => item switch + { + EquipmentNode e => e.EquipmentId, + DriverInstancePlan d => d.DriverInstanceId, + ScriptedAlarmPlan a => a.ScriptedAlarmId, + _ => string.Empty, + }; + + private static EquipmentNode? ReadEquipmentNode(JsonElement el) + { + var id = el.TryGetProperty("EquipmentId", out var idEl) ? idEl.GetString() : null; + var displayName = el.TryGetProperty("MachineCode", out var mcEl) ? mcEl.GetString() : null; + var lineId = el.TryGetProperty("UnsLineId", out var lineEl) ? lineEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id)) return null; + return new EquipmentNode(id!, displayName ?? id!, lineId ?? string.Empty); + } + + private static DriverInstancePlan? ReadDriverPlan(JsonElement el) + { + var id = el.TryGetProperty("DriverInstanceId", out var idEl) ? idEl.GetString() : null; + var type = el.TryGetProperty("DriverType", out var typeEl) ? typeEl.GetString() : null; + var config = el.TryGetProperty("DriverConfig", out var cfgEl) ? cfgEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id) || string.IsNullOrWhiteSpace(type)) return null; + return new DriverInstancePlan(id!, type!, config ?? "{}"); + } + + private static ScriptedAlarmPlan? ReadAlarmPlan(JsonElement el) + { + var id = el.TryGetProperty("ScriptedAlarmId", out var idEl) ? idEl.GetString() : null; + var equipmentId = el.TryGetProperty("EquipmentId", out var eqEl) ? eqEl.GetString() : null; + var script = el.TryGetProperty("PredicateScriptId", out var scEl) ? scEl.GetString() : null; + var template = el.TryGetProperty("MessageTemplate", out var tmEl) ? tmEl.GetString() : null; + if (string.IsNullOrWhiteSpace(id)) return null; + return new ScriptedAlarmPlan(id!, equipmentId ?? string.Empty, script ?? string.Empty, template ?? string.Empty); + } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 28af2ae..1934ccd 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -42,6 +42,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly IDriverFactory _driverFactory; private readonly IReadOnlySet _localRoles; private readonly IActorRef? _dependencyMux; + private readonly IActorRef? _opcUaPublishActor; private readonly ILoggingAdapter _log = Context.GetLogger(); private RevisionHash? _currentRevision; @@ -65,9 +66,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IActorRef? coordinator = null, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, - IActorRef? dependencyMux = null) => + IActorRef? dependencyMux = null, + IActorRef? opcUaPublishActor = null) => Akka.Actor.Props.Create(() => new DriverHostActor( - dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux)); + dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor)); public DriverHostActor( IDbContextFactory dbFactory, @@ -75,7 +77,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IActorRef? coordinator, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, - IActorRef? dependencyMux = null) + IActorRef? dependencyMux = null, + IActorRef? opcUaPublishActor = null) { _dbFactory = dbFactory; _localNode = localNode; @@ -83,6 +86,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _driverFactory = driverFactory ?? NullDriverFactory.Instance; _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); _dependencyMux = dependencyMux; + _opcUaPublishActor = opcUaPublishActor; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); @@ -244,6 +248,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _currentRevision = revision; UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null); SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation); + // Trigger the OPC UA address-space rebuild so the local SDK reflects the new + // composition. The publish actor handles the load-compose-diff-apply pipeline; we + // just forward the same correlation id so the audit trail joins up. + _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation)); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 05f8f99..2c24826 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -1,9 +1,13 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; @@ -33,26 +37,38 @@ public sealed class OpcUaPublishActor : ReceiveActor private readonly IServiceLevelPublisher _serviceLevel; private readonly bool _subscribeRedundancyTopic; private readonly NodeId? _localNode; + private readonly IDbContextFactory? _dbFactory; + private readonly Phase7Applier? _applier; private readonly ILoggingAdapter _log = Context.GetLogger(); private int _writes; private byte _lastServiceLevel; + private Phase7CompositionResult _lastApplied = new( + Array.Empty(), + Array.Empty(), + Array.Empty()); public int WriteCount => _writes; public byte LastServiceLevel => _lastServiceLevel; /// Production Props — pins the OPC UA dispatcher + subscribes to the /// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel - /// publish path. + /// publish path. When + are supplied, + /// reads the latest deployment artifact + drives the + /// applier through the sink. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, - NodeId? localNode = null) => + NodeId? localNode = null, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: true, - localNode)).WithDispatcher(DispatcherId); + localNode, + dbFactory, + applier)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. @@ -60,23 +76,31 @@ public sealed class OpcUaPublishActor : ReceiveActor IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, bool subscribeRedundancyTopic = false, - NodeId? localNode = null) => + NodeId? localNode = null, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic, - localNode)); + localNode, + dbFactory, + applier)); public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, bool subscribeRedundancyTopic, - NodeId? localNode) + NodeId? localNode, + IDbContextFactory? dbFactory = null, + Phase7Applier? applier = null) { _sink = sink; _serviceLevel = serviceLevel; _subscribeRedundancyTopic = subscribeRedundancyTopic; _localNode = localNode; + _dbFactory = dbFactory; + _applier = applier; Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); @@ -122,15 +146,61 @@ public sealed class OpcUaPublishActor : ReceiveActor private void HandleRebuild(RebuildAddressSpace msg) { + // Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against + // the latest deployment artifact. Without them, fall back to a raw sink rebuild — the + // F10b/dev path before the integration completes. + if (_dbFactory is null || _applier is null) + { + try { _sink.RebuildAddressSpace(); } + catch (Exception ex) + { + _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", + msg.Correlation); + } + return; + } + try { - _sink.RebuildAddressSpace(); - _log.Info("OpcUaPublish: address-space rebuilt (correlation={Correlation})", msg.Correlation); + var artifact = LoadLatestArtifact(); + var composition = DeploymentArtifact.ParseComposition(artifact); + var plan = Phase7Planner.Compute(_lastApplied, composition); + + if (plan.IsEmpty) + { + _log.Debug("OpcUaPublish: rebuild requested but plan is empty (correlation={Correlation})", + msg.Correlation); + return; + } + + var outcome = _applier.Apply(plan); + _lastApplied = composition; + _log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})", + msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled); } catch (Exception ex) { - _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", - msg.Correlation); + _log.Error(ex, "OpcUaPublish: rebuild pipeline threw (correlation={Correlation})", msg.Correlation); + } + } + + /// Read the most recent Sealed deployment's artifact blob from ConfigDb. + /// Empty array on any failure — the parser treats empty blob as "no composition". + private byte[] LoadLatestArtifact() + { + try + { + using var db = _dbFactory!.CreateDbContext(); + return db.Deployments.AsNoTracking() + .Where(d => d.Status == Configuration.Enums.DeploymentStatus.Sealed) + .OrderByDescending(d => d.SealedAtUtc) + .Select(d => d.ArtifactBlob) + .FirstOrDefault() ?? Array.Empty(); + } + catch (Exception ex) + { + _log.Warning(ex, "OpcUaPublish: failed to load latest deployment artifact; rebuild becomes no-op"); + return Array.Empty(); } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs index a4bbd36..6c715cb 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DeploymentArtifactTests.cs @@ -69,6 +69,48 @@ public sealed class DeploymentArtifactTests specs[1].Enabled.ShouldBeFalse(); } + [Fact] + public void ParseComposition_returns_empty_for_empty_blob() + { + var c = DeploymentArtifact.ParseComposition(ReadOnlySpan.Empty); + c.EquipmentNodes.ShouldBeEmpty(); + c.DriverInstancePlans.ShouldBeEmpty(); + c.ScriptedAlarmPlans.ShouldBeEmpty(); + } + + [Fact] + public void ParseComposition_reads_all_three_entity_classes_sorted_by_id() + { + var blob = JsonSerializer.SerializeToUtf8Bytes(new + { + Equipment = new[] + { + new { EquipmentId = "eq-z", MachineCode = "Z", UnsLineId = "line-1" }, + new { EquipmentId = "eq-a", MachineCode = "A", UnsLineId = "line-1" }, + }, + DriverInstances = new[] + { + new { DriverInstanceId = "drv-1", DriverType = "Modbus", DriverConfig = "{}" }, + }, + ScriptedAlarms = new[] + { + new + { + ScriptedAlarmId = "alarm-1", + EquipmentId = "eq-a", + PredicateScriptId = "script-1", + MessageTemplate = "high", + }, + }, + }); + + var c = DeploymentArtifact.ParseComposition(blob); + + c.EquipmentNodes.Select(e => e.EquipmentId).ShouldBe(new[] { "eq-a", "eq-z" }); + c.DriverInstancePlans.Single().DriverInstanceId.ShouldBe("drv-1"); + c.ScriptedAlarmPlans.Single().ScriptedAlarmId.ShouldBe("alarm-1"); + } + [Fact] public void Spec_missing_required_fields_is_dropped() { diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs new file mode 100644 index 0000000..b87b51e --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorRebuildTests.cs @@ -0,0 +1,144 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa; + +public sealed class OpcUaPublishActorRebuildTests : RuntimeActorTestBase +{ + [Fact] + public void RebuildAddressSpace_with_dbFactory_loads_artifact_composes_and_applies() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + + SeedDeployment(db, equipmentIds: new[] { "eq-1", "eq-2" }, driverIds: new[] { "drv-1" }); + + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + dbFactory: db, + applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + + AwaitAssert(() => + { + // Add path: Equipment + Driver + Alarm — but only Equipment/Alarm topology triggers + // RebuildAddressSpace. With 2 new equipment we expect one Rebuild call. + sink.RebuildCalls.ShouldBe(1); + }, duration: TimeSpan.FromSeconds(2)); + } + + [Fact] + public void Rebuild_with_no_artifact_is_idempotent_no_op() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + + // No deployment seeded — LoadLatestArtifact returns empty blob. + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, + dbFactory: db, + applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + Thread.Sleep(200); + + sink.RebuildCalls.ShouldBe(0); + } + + [Fact] + public void Second_rebuild_with_same_artifact_is_empty_plan_no_op() + { + var db = NewInMemoryDbFactory(); + var sink = new RecordingSink(); + var applier = new Phase7Applier(sink, NullLogger.Instance); + SeedDeployment(db, equipmentIds: new[] { "eq-1" }, driverIds: Array.Empty()); + + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + sink: sink, dbFactory: db, applier: applier)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromSeconds(2)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + Thread.Sleep(200); + // Same composition ⇒ plan IsEmpty ⇒ applier not called again. + sink.RebuildCalls.ShouldBe(1); + } + + [Fact] + public void Rebuild_without_dbFactory_falls_back_to_raw_sink_rebuild() + { + // Pre-#109 behavior: no dbFactory wired ⇒ RebuildAddressSpace calls _sink.RebuildAddressSpace + // directly. The dev/Mac path before the full integration is bound. + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + + AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500)); + } + + private static void SeedDeployment( + IDbContextFactory dbFactory, + string[] equipmentIds, + string[] driverIds) + { + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Equipment = equipmentIds.Select(id => new + { + EquipmentId = id, + MachineCode = id.ToUpperInvariant(), + UnsLineId = "line-1", + Name = id, + }).ToArray(), + DriverInstances = driverIds.Select(id => new + { + DriverInstanceId = id, + DriverType = "Modbus", + Enabled = true, + DriverConfig = "{}", + }).ToArray(), + ScriptedAlarms = Array.Empty(), + }); + + using var ctx = dbFactory.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = Guid.NewGuid(), + RevisionHash = new string('a', 64), + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public ConcurrentQueue Calls { get; } = new(); + public int RebuildCalls; + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) + => Calls.Enqueue($"WV:{nodeId}"); + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) + => Calls.Enqueue($"WA:{alarmNodeId}"); + public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls); + } +}