diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index d2e1a971..be81c509 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -3,6 +3,7 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; @@ -14,6 +15,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; @@ -52,8 +54,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly IActorRef? _dependencyMux; private readonly IActorRef? _opcUaPublishActor; private readonly IDriverHealthPublisher _healthPublisher; + private readonly IVirtualTagEvaluator _virtualTagEvaluator; + private readonly IActorRef? _virtualTagHostOverride; private readonly ILoggingAdapter _log = Context.GetLogger(); + /// The single VirtualTag-host child that spawns/reconciles Equipment-namespace + /// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in + /// when an OPC UA publish actor is wired; receives + /// from . + private IActorRef? _virtualTagHost; + private RevisionHash? _currentRevision; private DeploymentId? _applyingDeploymentId; @@ -85,6 +95,14 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to /// so test harnesses and smoke fixtures don't need to wire it. + /// Optional evaluator handed to the spawned + /// 's children; defaults to + /// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved + /// Roslyn evaluator. + /// Test seam: when supplied, this actor is used as the + /// VirtualTag host instead of spawning a real child, so tests + /// can intercept the message. Null in + /// production (the real host is spawned). public static Props Props( IDbContextFactory dbFactory, CommonsNodeId localNode, @@ -93,9 +111,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, - IDriverHealthPublisher? healthPublisher = null) => + IDriverHealthPublisher? healthPublisher = null, + IVirtualTagEvaluator? virtualTagEvaluator = null, + IActorRef? virtualTagHostOverride = null) => Akka.Actor.Props.Create(() => new DriverHostActor( - dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher)); + dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, + healthPublisher, virtualTagEvaluator, virtualTagHostOverride)); /// Initializes a new DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. @@ -106,6 +127,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to . + /// Optional evaluator handed to the VirtualTag host's children; + /// defaults to . + /// Test seam: when supplied, used as the VirtualTag host + /// instead of spawning a real child. public DriverHostActor( IDbContextFactory dbFactory, CommonsNodeId localNode, @@ -114,7 +139,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, - IDriverHealthPublisher? healthPublisher = null) + IDriverHealthPublisher? healthPublisher = null, + IVirtualTagEvaluator? virtualTagEvaluator = null, + IActorRef? virtualTagHostOverride = null) { _dbFactory = dbFactory; _localNode = localNode; @@ -124,6 +151,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _dependencyMux = dependencyMux; _opcUaPublishActor = opcUaPublishActor; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; + _virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance; + _virtualTagHostOverride = virtualTagHostOverride; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); @@ -136,9 +165,40 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self)); // Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here. DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self)); + // Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes + // through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target. + SpawnVirtualTagHost(); Bootstrap(); } + /// + /// Spawns the single child that owns the Equipment-namespace + /// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied + /// virtualTagHostOverride short-circuits the spawn so a probe can intercept + /// . The real host requires a non-null + /// (its ctor throws otherwise), so when no publish actor is + /// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and + /// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it. + /// + private void SpawnVirtualTagHost() + { + if (_virtualTagHostOverride is not null) + { + _virtualTagHost = _virtualTagHostOverride; + return; + } + + if (_opcUaPublishActor is null) + { + _log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode); + return; + } + + _virtualTagHost = Context.ActorOf( + VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator), + "virtual-tag-host"); + } + private void Bootstrap() { // Read the most-recent NodeDeploymentState for this node; if it's Applied, jump @@ -459,6 +519,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)", _localNode, total, refsByDriver.Count); } + + // Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a + // VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt + // address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore + // path (RestoreApplied) because both call this method, so one send covers both. + _virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags)); + if (composition.EquipmentVirtualTags.Count > 0) + { + _log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host", + _localNode, composition.EquipmentVirtualTags.Count); + } } private void SpawnChild(DriverInstanceSpec spec) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index ae03e824..f1ec86f6 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Configuration; @@ -89,6 +90,10 @@ public static class ServiceCollectionExtensions var serviceLevel = resolver.GetService() ?? NullServiceLevelPublisher.Instance; var loggerFactory = resolver.GetService() ?? NullLoggerFactory.Instance; var healthPublisher = resolver.GetService() ?? NullDriverHealthPublisher.Instance; + // Production evaluator is the Host's RoslynVirtualTagEvaluator (registered as + // IVirtualTagEvaluator); fall back to the null evaluator for test harnesses that don't + // register one (VirtualTagActor children then evaluate to nothing). + var virtualTagEvaluator = resolver.GetService() ?? NullVirtualTagEvaluator.Instance; var dbHealth = system.ActorOf( DbHealthProbeActor.Props(dbFactory), @@ -119,7 +124,8 @@ public static class ServiceCollectionExtensions driverFactory: driverFactory, localRoles: roleInfo.LocalRoles, dependencyMux: mux, opcUaPublishActor: publishActor, - healthPublisher: healthPublisher), + healthPublisher: healthPublisher, + virtualTagEvaluator: virtualTagEvaluator), DriverHostActorName); registry.Register(driverHost); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorVirtualTagTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorVirtualTagTests.cs new file mode 100644 index 00000000..443996d3 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorVirtualTagTests.cs @@ -0,0 +1,141 @@ +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Verifies the live-deploy wiring of : the +/// must forward the composition's +/// EquipmentVirtualTags to its spawned VirtualTag host via +/// on BOTH the fresh-apply path and the +/// bootstrap-restore path (both route through PushDesiredSubscriptions). The host is +/// injected as a via the Props override seam so the +/// ApplyVirtualTags can be intercepted. +/// +public sealed class DriverHostActorVirtualTagTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("driver-vt-test"); + private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64)); + + /// Fresh apply: dispatching a deployment whose artifact carries one Equipment + /// VirtualTag forwards an carrying that + /// plan to the injected VirtualTag host. + [Fact] + public void Apply_forwards_EquipmentVirtualTags_to_virtual_tag_host() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeploymentWithVirtualTag(db, RevA); + + var coordinator = CreateTestProbe(); + var vtHost = CreateTestProbe(); + var actor = Sys.ActorOf(DriverHostActor.Props( + db, TestNode, coordinator.Ref, + localRoles: new HashSet { "driver" }, + virtualTagEvaluator: NullVirtualTagEvaluator.Instance, + virtualTagHostOverride: vtHost.Ref)); + + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + + coordinator.ExpectMsg(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied); + + var apply = vtHost.ExpectMsg(TimeSpan.FromSeconds(5)); + var plan = apply.Plans.ShouldHaveSingleItem(); + plan.VirtualTagId.ShouldBe("vt-1"); + plan.EquipmentId.ShouldBe("eq-1"); + plan.Name.ShouldBe("Doubled"); + } + + /// Bootstrap-restore: a node that already has an Applied NodeDeploymentState + /// row for a VirtualTag-carrying deployment re-forwards the + /// on PreStart (no dispatch needed), so a + /// restarted node restores its live VirtualTag children. + [Fact] + public void Restore_on_bootstrap_forwards_EquipmentVirtualTags_to_virtual_tag_host() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeploymentWithVirtualTag(db, RevA); + SeedAppliedNodeState(db, deploymentId); + + var coordinator = CreateTestProbe(); + var vtHost = CreateTestProbe(); + // No DispatchDeployment — Bootstrap() should detect the Applied row and run RestoreApplied, + // which routes through PushDesiredSubscriptions and forwards ApplyVirtualTags. + Sys.ActorOf(DriverHostActor.Props( + db, TestNode, coordinator.Ref, + localRoles: new HashSet { "driver" }, + virtualTagEvaluator: NullVirtualTagEvaluator.Instance, + virtualTagHostOverride: vtHost.Ref)); + + var apply = vtHost.ExpectMsg(TimeSpan.FromSeconds(5)); + apply.Plans.ShouldHaveSingleItem().VirtualTagId.ShouldBe("vt-1"); + } + + private static DeploymentId SeedDeploymentWithVirtualTag( + IDbContextFactory db, RevisionHash rev) + { + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Scripts = new[] + { + new + { + ScriptId = "scr-1", + SourceCode = "return ctx.GetTag(\"TestMachine_001.TestDouble\").Value;", + }, + }, + VirtualTags = new[] + { + new + { + VirtualTagId = "vt-1", + EquipmentId = "eq-1", + Name = "Doubled", + DataType = "Float", + ScriptId = "scr-1", + }, + }, + }); + + var id = DeploymentId.NewId(); + using var ctx = db.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = id.Value, + RevisionHash = rev.Value, + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + return id; + } + + private static void SeedAppliedNodeState( + IDbContextFactory db, DeploymentId deploymentId) + { + using var ctx = db.CreateDbContext(); + ctx.NodeDeploymentStates.Add(new NodeDeploymentState + { + NodeId = TestNode.Value, + DeploymentId = deploymentId.Value, + Status = NodeDeploymentStatus.Applied, + StartedAtUtc = DateTime.UtcNow, + AppliedAtUtc = DateTime.UtcNow, + }); + ctx.SaveChanges(); + } +}