feat(runtime): spawn+apply VirtualTagHostActor on deploy apply and restore

This commit is contained in:
Joseph Doherty
2026-06-07 05:41:04 -04:00
parent 5e2869bab7
commit 397f9b783a
3 changed files with 222 additions and 4 deletions
@@ -3,6 +3,7 @@ using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
@@ -14,6 +15,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
@@ -52,8 +54,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
private readonly IActorRef? _dependencyMux;
private readonly IActorRef? _opcUaPublishActor;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
private readonly IActorRef? _virtualTagHostOverride;
private readonly ILoggingAdapter _log = Context.GetLogger();
/// <summary>The single VirtualTag-host child that spawns/reconciles Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in
/// <see cref="PreStart"/> when an OPC UA publish actor is wired; receives
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> from <see cref="PushDesiredSubscriptions"/>.</summary>
private IActorRef? _virtualTagHost;
private RevisionHash? _currentRevision;
private DeploymentId? _applyingDeploymentId;
@@ -85,6 +95,14 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
/// so test harnesses and smoke fixtures don't need to wire it.</param>
/// <param name="virtualTagEvaluator">Optional evaluator handed to the spawned
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
/// Roslyn evaluator.</param>
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
/// production (the real host is spawned).</param>
public static Props Props(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
@@ -93,9 +111,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null) =>
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IActorRef? virtualTagHostOverride = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher));
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
healthPublisher, virtualTagEvaluator, virtualTagHostOverride));
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
/// <param name="dbFactory">Database context factory for configuration database access.</param>
@@ -106,6 +127,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
public DriverHostActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
@@ -114,7 +139,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null)
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IActorRef? virtualTagHostOverride = null)
{
_dbFactory = dbFactory;
_localNode = localNode;
@@ -124,6 +151,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
_dependencyMux = dependencyMux;
_opcUaPublishActor = opcUaPublishActor;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
_virtualTagHostOverride = virtualTagHostOverride;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
@@ -136,9 +165,40 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
SpawnVirtualTagHost();
Bootstrap();
}
/// <summary>
/// Spawns the single <see cref="VirtualTagHostActor"/> child that owns the Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied
/// <c>virtualTagHostOverride</c> short-circuits the spawn so a probe can intercept
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/>. The real host requires a non-null
/// <see cref="_opcUaPublishActor"/> (its ctor throws otherwise), so when no publish actor is
/// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and
/// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it.
/// </summary>
private void SpawnVirtualTagHost()
{
if (_virtualTagHostOverride is not null)
{
_virtualTagHost = _virtualTagHostOverride;
return;
}
if (_opcUaPublishActor is null)
{
_log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode);
return;
}
_virtualTagHost = Context.ActorOf(
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator),
"virtual-tag-host");
}
private void Bootstrap()
{
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
@@ -459,6 +519,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
_localNode, total, refsByDriver.Count);
}
// Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a
// VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt
// address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore
// path (RestoreApplied) because both call this method, so one send covers both.
_virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags));
if (composition.EquipmentVirtualTags.Count > 0)
{
_log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host",
_localNode, composition.EquipmentVirtualTags.Count);
}
}
private void SpawnChild(DriverInstanceSpec spec)
@@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Configuration;
@@ -89,6 +90,10 @@ public static class ServiceCollectionExtensions
var serviceLevel = resolver.GetService<IServiceLevelPublisher>() ?? NullServiceLevelPublisher.Instance;
var loggerFactory = resolver.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance;
var healthPublisher = resolver.GetService<IDriverHealthPublisher>() ?? NullDriverHealthPublisher.Instance;
// Production evaluator is the Host's RoslynVirtualTagEvaluator (registered as
// IVirtualTagEvaluator); fall back to the null evaluator for test harnesses that don't
// register one (VirtualTagActor children then evaluate to nothing).
var virtualTagEvaluator = resolver.GetService<IVirtualTagEvaluator>() ?? NullVirtualTagEvaluator.Instance;
var dbHealth = system.ActorOf(
DbHealthProbeActor.Props(dbFactory),
@@ -119,7 +124,8 @@ public static class ServiceCollectionExtensions
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles,
dependencyMux: mux,
opcUaPublishActor: publishActor,
healthPublisher: healthPublisher),
healthPublisher: healthPublisher,
virtualTagEvaluator: virtualTagEvaluator),
DriverHostActorName);
registry.Register<DriverHostActorKey>(driverHost);
@@ -0,0 +1,141 @@
using System.Text.Json;
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
/// <summary>
/// Verifies the live-deploy wiring of <see cref="VirtualTagHostActor"/>: the
/// <see cref="DriverHostActor"/> must forward the composition's
/// <c>EquipmentVirtualTags</c> to its spawned VirtualTag host via
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> on BOTH the fresh-apply path and the
/// bootstrap-restore path (both route through <c>PushDesiredSubscriptions</c>). The host is
/// injected as a <see cref="Akka.TestKit.TestProbe"/> via the Props override seam so the
/// ApplyVirtualTags can be intercepted.
/// </summary>
public sealed class DriverHostActorVirtualTagTests : RuntimeActorTestBase
{
private static readonly NodeId TestNode = NodeId.Parse("driver-vt-test");
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
/// <summary>Fresh apply: dispatching a deployment whose artifact carries one Equipment
/// VirtualTag forwards an <see cref="VirtualTagHostActor.ApplyVirtualTags"/> carrying that
/// plan to the injected VirtualTag host.</summary>
[Fact]
public void Apply_forwards_EquipmentVirtualTags_to_virtual_tag_host()
{
var db = NewInMemoryDbFactory();
var deploymentId = SeedDeploymentWithVirtualTag(db, RevA);
var coordinator = CreateTestProbe();
var vtHost = CreateTestProbe();
var actor = Sys.ActorOf(DriverHostActor.Props(
db, TestNode, coordinator.Ref,
localRoles: new HashSet<string> { "driver" },
virtualTagEvaluator: NullVirtualTagEvaluator.Instance,
virtualTagHostOverride: vtHost.Ref));
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied);
var apply = vtHost.ExpectMsg<VirtualTagHostActor.ApplyVirtualTags>(TimeSpan.FromSeconds(5));
var plan = apply.Plans.ShouldHaveSingleItem();
plan.VirtualTagId.ShouldBe("vt-1");
plan.EquipmentId.ShouldBe("eq-1");
plan.Name.ShouldBe("Doubled");
}
/// <summary>Bootstrap-restore: a node that already has an <c>Applied</c> NodeDeploymentState
/// row for a VirtualTag-carrying deployment re-forwards the
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> on PreStart (no dispatch needed), so a
/// restarted node restores its live VirtualTag children.</summary>
[Fact]
public void Restore_on_bootstrap_forwards_EquipmentVirtualTags_to_virtual_tag_host()
{
var db = NewInMemoryDbFactory();
var deploymentId = SeedDeploymentWithVirtualTag(db, RevA);
SeedAppliedNodeState(db, deploymentId);
var coordinator = CreateTestProbe();
var vtHost = CreateTestProbe();
// No DispatchDeployment — Bootstrap() should detect the Applied row and run RestoreApplied,
// which routes through PushDesiredSubscriptions and forwards ApplyVirtualTags.
Sys.ActorOf(DriverHostActor.Props(
db, TestNode, coordinator.Ref,
localRoles: new HashSet<string> { "driver" },
virtualTagEvaluator: NullVirtualTagEvaluator.Instance,
virtualTagHostOverride: vtHost.Ref));
var apply = vtHost.ExpectMsg<VirtualTagHostActor.ApplyVirtualTags>(TimeSpan.FromSeconds(5));
apply.Plans.ShouldHaveSingleItem().VirtualTagId.ShouldBe("vt-1");
}
private static DeploymentId SeedDeploymentWithVirtualTag(
IDbContextFactory<OtOpcUaConfigDbContext> db, RevisionHash rev)
{
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
{
Scripts = new[]
{
new
{
ScriptId = "scr-1",
SourceCode = "return ctx.GetTag(\"TestMachine_001.TestDouble\").Value;",
},
},
VirtualTags = new[]
{
new
{
VirtualTagId = "vt-1",
EquipmentId = "eq-1",
Name = "Doubled",
DataType = "Float",
ScriptId = "scr-1",
},
},
});
var id = DeploymentId.NewId();
using var ctx = db.CreateDbContext();
ctx.Deployments.Add(new Deployment
{
DeploymentId = id.Value,
RevisionHash = rev.Value,
Status = DeploymentStatus.Sealed,
CreatedBy = "test",
SealedAtUtc = DateTime.UtcNow,
ArtifactBlob = artifact,
});
ctx.SaveChanges();
return id;
}
private static void SeedAppliedNodeState(
IDbContextFactory<OtOpcUaConfigDbContext> db, DeploymentId deploymentId)
{
using var ctx = db.CreateDbContext();
ctx.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = TestNode.Value,
DeploymentId = deploymentId.Value,
Status = NodeDeploymentStatus.Applied,
StartedAtUtc = DateTime.UtcNow,
AppliedAtUtc = DateTime.UtcNow,
});
ctx.SaveChanges();
}
}