feat(runtime): VirtualTagHostActor spawns VTag actors + bridges results to OPC UA
This commit is contained in:
@@ -0,0 +1,138 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Supervisor that gives Equipment-namespace VirtualTags live values. For each
|
||||
/// <see cref="EquipmentVirtualTagPlan"/> in the desired set it spawns one child
|
||||
/// <see cref="VirtualTagActor"/> (which self-registers with the dependency mux and evaluates its
|
||||
/// expression on dependency changes) and remembers the plan's <b>folder-scoped NodeId</b>. When a
|
||||
/// child reports a fresh <see cref="VirtualTagActor.EvaluationResult"/>, the host bridges it onto
|
||||
/// an <see cref="OpcUaPublishActor.AttributeValueUpdate"/> targeting that NodeId so the
|
||||
/// already-materialised Variable node (currently BadWaitingForInitialData) reflects the value.
|
||||
///
|
||||
/// <para>
|
||||
/// The published NodeId is computed with the <b>identical</b> formula
|
||||
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> uses to materialise the variable —
|
||||
/// <c>{parent}/{Name}</c> where <c>parent = IsNullOrWhiteSpace(FolderPath) ? EquipmentId :
|
||||
/// {EquipmentId}/{FolderPath}</c> — or the value would land on a NodeId that does not exist.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public sealed class VirtualTagHostActor : ReceiveActor
|
||||
{
|
||||
/// <summary>Reconciles the live VirtualTag children to exactly the supplied desired set:
|
||||
/// stops children whose vtagId is gone, spawns children for new vtagIds, and rebuilds the
|
||||
/// vtagId→NodeId map so renames are reflected.</summary>
|
||||
/// <param name="Plans">The desired Equipment-namespace VirtualTag plans.</param>
|
||||
public sealed record ApplyVirtualTags(IReadOnlyList<EquipmentVirtualTagPlan> Plans);
|
||||
|
||||
private readonly IActorRef _publishActor;
|
||||
private readonly IActorRef? _mux;
|
||||
private readonly IVirtualTagEvaluator _evaluator;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
// vtagId -> spawned child VirtualTagActor.
|
||||
private readonly Dictionary<string, IActorRef> _children = new(StringComparer.Ordinal);
|
||||
// vtagId -> folder-scoped OPC UA NodeId the materialiser placed the variable at.
|
||||
private readonly Dictionary<string, string> _nodeIdByVtag = new(StringComparer.Ordinal);
|
||||
|
||||
/// <summary>Factory method to create Props for a VirtualTagHostActor.</summary>
|
||||
/// <param name="publishActor">The OPC UA publish actor that consumes
|
||||
/// <see cref="OpcUaPublishActor.AttributeValueUpdate"/> bridged from child results.</param>
|
||||
/// <param name="mux">Optional dependency multiplexer; passed to each spawned child so it can
|
||||
/// register interest in its dependency refs. Null on the dev/Mac path (no live values).</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public static Props Props(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagHostActor(publishActor, mux, evaluator));
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="VirtualTagHostActor"/> class.</summary>
|
||||
/// <param name="publishActor">The OPC UA publish actor results are bridged to.</param>
|
||||
/// <param name="mux">Optional dependency multiplexer passed to each spawned child.</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public VirtualTagHostActor(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(publishActor);
|
||||
ArgumentNullException.ThrowIfNull(evaluator);
|
||||
_publishActor = publishActor;
|
||||
_mux = mux;
|
||||
_evaluator = evaluator;
|
||||
|
||||
Receive<ApplyVirtualTags>(OnApply);
|
||||
Receive<VirtualTagActor.EvaluationResult>(OnResult);
|
||||
}
|
||||
|
||||
private void OnApply(ApplyVirtualTags msg)
|
||||
{
|
||||
var desired = new HashSet<string>(msg.Plans.Select(p => p.VirtualTagId), StringComparer.Ordinal);
|
||||
|
||||
// Stop + forget children whose vtagId is no longer desired. Stopping the child triggers its
|
||||
// PostStop, which unregisters its interest from the mux.
|
||||
foreach (var vtagId in _children.Keys.Where(id => !desired.Contains(id)).ToList())
|
||||
{
|
||||
Context.Stop(_children[vtagId]);
|
||||
_children.Remove(vtagId);
|
||||
}
|
||||
|
||||
// Rebuild the NodeId map every apply so renames (Name/FolderPath/EquipmentId changes) are
|
||||
// picked up. The map only contains currently-desired vtags, so a result for a removed vtag
|
||||
// finds no entry and is dropped.
|
||||
_nodeIdByVtag.Clear();
|
||||
foreach (var p in msg.Plans)
|
||||
{
|
||||
_nodeIdByVtag[p.VirtualTagId] = NodeIdFor(p);
|
||||
}
|
||||
|
||||
// Spawn children for new vtagIds only — existing children keep their mux subscriptions and
|
||||
// last-value dedup state. Expression/dependency changes on an existing vtag are NOT
|
||||
// re-applied here; the loader's vtags are stable, and a future enhancement can stop+respawn
|
||||
// a child whose plan changed (the diff already identifies ChangedEquipmentVirtualTags).
|
||||
foreach (var p in msg.Plans)
|
||||
{
|
||||
if (_children.ContainsKey(p.VirtualTagId)) continue;
|
||||
|
||||
// Auto-name the child: vtagIds can contain characters illegal in actor names, so let Akka
|
||||
// assign a safe unique name. The child self-registers with the mux in PreStart.
|
||||
var child = Context.ActorOf(VirtualTagActor.Props(
|
||||
virtualTagId: p.VirtualTagId,
|
||||
expression: p.Expression,
|
||||
evaluator: _evaluator,
|
||||
scriptId: p.VirtualTagId,
|
||||
publisherFactory: null,
|
||||
dependencyRefs: p.DependencyRefs,
|
||||
mux: _mux));
|
||||
_children[p.VirtualTagId] = child;
|
||||
}
|
||||
|
||||
_log.Debug("VirtualTagHost: applied (desired={Desired}, children={Children})",
|
||||
desired.Count, _children.Count);
|
||||
}
|
||||
|
||||
private void OnResult(VirtualTagActor.EvaluationResult result)
|
||||
{
|
||||
// A result may arrive for a vtag that was just removed from the desired set (the child's
|
||||
// last in-flight message). With no NodeId mapping we have nowhere to land it — drop silently.
|
||||
if (!_nodeIdByVtag.TryGetValue(result.VirtualTagId, out var nodeId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_publishActor.Tell(new OpcUaPublishActor.AttributeValueUpdate(
|
||||
nodeId, result.Value, OpcUaQuality.Good, result.TimestampUtc));
|
||||
}
|
||||
|
||||
/// <summary>Folder-scoped NodeId for a VirtualTag plan — MUST match
|
||||
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> exactly, or the published value lands on a
|
||||
/// NodeId that was never materialised.</summary>
|
||||
private static string NodeIdFor(EquipmentVirtualTagPlan p)
|
||||
{
|
||||
var parent = string.IsNullOrWhiteSpace(p.FolderPath)
|
||||
? p.EquipmentId
|
||||
: $"{p.EquipmentId}/{p.FolderPath}";
|
||||
return $"{parent}/{p.Name}";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies <see cref="VirtualTagHostActor"/> reconciles a desired set of
|
||||
/// <see cref="EquipmentVirtualTagPlan"/> into child <see cref="VirtualTagActor"/>s and bridges each
|
||||
/// child's <see cref="VirtualTagActor.EvaluationResult"/> onto an
|
||||
/// <see cref="OpcUaPublishActor.AttributeValueUpdate"/> carrying the folder-scoped NodeId computed by
|
||||
/// the materialiser.
|
||||
/// </summary>
|
||||
public sealed class VirtualTagHostActorTests : RuntimeActorTestBase
|
||||
{
|
||||
/// <summary>A plan with no FolderPath maps onto NodeId "EquipmentId/Name".</summary>
|
||||
private static EquipmentVirtualTagPlan Plan(
|
||||
string vtagId, string equipmentId, string name, string folderPath = "") =>
|
||||
new(
|
||||
VirtualTagId: vtagId,
|
||||
EquipmentId: equipmentId,
|
||||
FolderPath: folderPath,
|
||||
Name: name,
|
||||
DataType: "Double",
|
||||
Expression: "ctx.GetTag(\"a\")",
|
||||
DependencyRefs: new[] { "a" });
|
||||
|
||||
/// <summary>Spawn: an apply with one plan spins up exactly one live child VirtualTagActor.</summary>
|
||||
[Fact]
|
||||
public void ApplyVirtualTags_spawns_one_child_per_plan()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var mux = CreateTestProbe();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux.Ref, new StubEvaluator()));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(new[] { Plan("vt-1", "eq-1", "speed-rpm") }));
|
||||
|
||||
// The child self-registers with the mux in PreStart, so a RegisterInterest landing on the
|
||||
// mux probe is proof the host spawned a live child.
|
||||
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||
reg.TagRefs.ShouldContain("a");
|
||||
}
|
||||
|
||||
/// <summary>KEY TEST: a child EvaluationResult is bridged to the publish actor with the
|
||||
/// folder-scoped NodeId, Value, Good quality, and source timestamp preserved.</summary>
|
||||
[Fact]
|
||||
public void EvaluationResult_is_bridged_with_folder_scoped_NodeId()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator()));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(new[] { Plan("vt-1", "eq-1", "speed-rpm") }));
|
||||
|
||||
var ts = new DateTime(2026, 6, 7, 12, 0, 0, DateTimeKind.Utc);
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-1", 42.0, ts, CorrelationId.NewId()));
|
||||
|
||||
var update = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>();
|
||||
update.NodeId.ShouldBe("eq-1/speed-rpm");
|
||||
update.Value.ShouldBe(42.0);
|
||||
update.Quality.ShouldBe(OpcUaQuality.Good);
|
||||
update.TimestampUtc.ShouldBe(ts);
|
||||
}
|
||||
|
||||
/// <summary>FolderPath is honoured in the published NodeId (EquipmentId/FolderPath/Name).</summary>
|
||||
[Fact]
|
||||
public void EvaluationResult_NodeId_includes_folder_path_when_set()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator()));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(
|
||||
new[] { Plan("vt-1", "eq-1", "speed-rpm", folderPath: "metrics") }));
|
||||
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-1", 1.0, DateTime.UtcNow, CorrelationId.NewId()));
|
||||
|
||||
var update = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>();
|
||||
update.NodeId.ShouldBe("eq-1/metrics/speed-rpm");
|
||||
}
|
||||
|
||||
/// <summary>Stop-removed: a vtag dropped from the desired set is unmapped, so a later result for
|
||||
/// it produces NO publish.</summary>
|
||||
[Fact]
|
||||
public void Removed_vtag_is_no_longer_bridged()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator()));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(new[] { Plan("vt-1", "eq-1", "speed-rpm") }));
|
||||
// Re-apply without vt-1 — it should be stopped + unmapped.
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(Array.Empty<EquipmentVirtualTagPlan>()));
|
||||
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-1", 99.0, DateTime.UtcNow, CorrelationId.NewId()));
|
||||
|
||||
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
/// <summary>Unmapped result dropped: a result for an unknown vtagId is silently ignored.</summary>
|
||||
[Fact]
|
||||
public void Result_for_unknown_vtag_is_dropped()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator()));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(new[] { Plan("vt-1", "eq-1", "speed-rpm") }));
|
||||
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-unknown", 7.0, DateTime.UtcNow, CorrelationId.NewId()));
|
||||
|
||||
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||
}
|
||||
|
||||
/// <summary>Deterministic no-op evaluator: keeps spawned children inert so tests drive the host's
|
||||
/// OnResult path directly via synthetic EvaluationResults.</summary>
|
||||
private sealed class StubEvaluator : IVirtualTagEvaluator
|
||||
{
|
||||
/// <summary>Returns NoChange so the child never emits on its own.</summary>
|
||||
/// <param name="id">The tag identifier.</param>
|
||||
/// <param name="expr">The expression string.</param>
|
||||
/// <param name="deps">The dependency values.</param>
|
||||
/// <returns>A NoChange result.</returns>
|
||||
public VirtualTagEvalResult Evaluate(string id, string expr, IReadOnlyDictionary<string, object?> deps)
|
||||
=> VirtualTagEvalResult.NoChange;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user