merge: equipment-namespace live values (VirtualTag route)
v2-ci / build (push) Failing after 36s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
v2-ci / build (push) Failing after 36s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
This commit is contained in:
@@ -193,10 +193,12 @@ public static class DeploymentArtifact
|
||||
var alarms = ReadArray(root, "ScriptedAlarms", ReadAlarmPlan);
|
||||
var galaxyTags = BuildGalaxyTagPlans(root, drivers);
|
||||
var equipmentTags = BuildEquipmentTagPlans(root);
|
||||
var equipmentVirtualTags = BuildEquipmentVirtualTagPlans(root);
|
||||
|
||||
return new Phase7CompositionResult(areas, lines, equipment, drivers, alarms, galaxyTags)
|
||||
{
|
||||
EquipmentTags = equipmentTags,
|
||||
EquipmentVirtualTags = equipmentVirtualTags,
|
||||
};
|
||||
}
|
||||
catch (JsonException)
|
||||
@@ -251,6 +253,7 @@ public static class DeploymentArtifact
|
||||
full.GalaxyTags.Where(t => sets.DriverIds.Contains(t.DriverInstanceId)).ToArray())
|
||||
{
|
||||
EquipmentTags = full.EquipmentTags.Where(t => sets.DriverIds.Contains(t.DriverInstanceId)).ToArray(),
|
||||
EquipmentVirtualTags = full.EquipmentVirtualTags.Where(v => sets.EquipmentIds.Contains(v.EquipmentId)).ToArray(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -490,6 +493,91 @@ public static class DeploymentArtifact
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Join the artifact's VirtualTags array to its Scripts array (by ScriptId) to emit one
|
||||
/// <see cref="EquipmentVirtualTagPlan"/> per VirtualTag. The artifact-decode mirror of
|
||||
/// <c>Phase7Composer.Compose</c>'s VirtualTag producer — so the compose-side + artifact-decode
|
||||
/// plans agree. <c>Expression</c> = the joined Script's <c>SourceCode</c> (empty when the
|
||||
/// ScriptId is absent); <c>DependencyRefs</c> = the distinct <c>ctx.GetTag("…")</c> literals in
|
||||
/// that source; <c>FolderPath</c> is always "" (VirtualTag has no FolderPath today). Ordered by
|
||||
/// EquipmentId then Name to match the composer's deterministic ordering.
|
||||
/// </summary>
|
||||
private static IReadOnlyList<EquipmentVirtualTagPlan> BuildEquipmentVirtualTagPlans(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("VirtualTags", out var vtArr) || vtArr.ValueKind != JsonValueKind.Array)
|
||||
return Array.Empty<EquipmentVirtualTagPlan>();
|
||||
|
||||
// scriptId → SourceCode (the expression source the VirtualTagActor evaluates).
|
||||
var scriptSourceById = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
if (root.TryGetProperty("Scripts", out var scriptsArr) && scriptsArr.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
foreach (var el in scriptsArr.EnumerateArray())
|
||||
{
|
||||
if (el.ValueKind != JsonValueKind.Object) continue;
|
||||
var sid = el.TryGetProperty("ScriptId", out var sidEl) ? sidEl.GetString() : null;
|
||||
if (string.IsNullOrWhiteSpace(sid)) continue;
|
||||
var src = el.TryGetProperty("SourceCode", out var srcEl) && srcEl.ValueKind == JsonValueKind.String
|
||||
? srcEl.GetString() : null;
|
||||
scriptSourceById[sid!] = src ?? string.Empty;
|
||||
}
|
||||
}
|
||||
|
||||
var result = new List<EquipmentVirtualTagPlan>(vtArr.GetArrayLength());
|
||||
foreach (var el in vtArr.EnumerateArray())
|
||||
{
|
||||
if (el.ValueKind != JsonValueKind.Object) continue;
|
||||
var virtualTagId = el.TryGetProperty("VirtualTagId", out var vidEl) ? vidEl.GetString() : null;
|
||||
var equipmentId = el.TryGetProperty("EquipmentId", out var eqEl) ? eqEl.GetString() : null;
|
||||
var name = el.TryGetProperty("Name", out var nmEl) ? nmEl.GetString() : null;
|
||||
var dataType = el.TryGetProperty("DataType", out var dtEl) ? dtEl.GetString() : null;
|
||||
var scriptId = el.TryGetProperty("ScriptId", out var sidEl) ? sidEl.GetString() : null;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(virtualTagId) || string.IsNullOrWhiteSpace(equipmentId)
|
||||
|| string.IsNullOrWhiteSpace(name)) continue;
|
||||
|
||||
var source = scriptId is not null && scriptSourceById.TryGetValue(scriptId, out var src)
|
||||
? src : string.Empty;
|
||||
|
||||
result.Add(new EquipmentVirtualTagPlan(
|
||||
VirtualTagId: virtualTagId!,
|
||||
EquipmentId: equipmentId!,
|
||||
FolderPath: string.Empty,
|
||||
Name: name!,
|
||||
DataType: dataType ?? "BaseDataType",
|
||||
Expression: source,
|
||||
DependencyRefs: ExtractDependencyRefs(source)));
|
||||
}
|
||||
|
||||
result.Sort((a, b) =>
|
||||
{
|
||||
var byEquipment = string.CompareOrdinal(a.EquipmentId, b.EquipmentId);
|
||||
return byEquipment != 0 ? byEquipment : string.CompareOrdinal(a.Name, b.Name);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
private static readonly System.Text.RegularExpressions.Regex GetTagRefRegex =
|
||||
new(@"ctx\s*\.\s*GetTag\s*\(\s*""([^""]+)""\s*\)", System.Text.RegularExpressions.RegexOptions.Compiled);
|
||||
|
||||
/// <summary>
|
||||
/// Distinct <c>ctx.GetTag("ref")</c> string literals in a VirtualTag script source, in
|
||||
/// first-seen order. The artifact-decode mirror of <c>Phase7Composer.ExtractDependencyRefs</c>
|
||||
/// — replicated (with the same regex) because Runtime does not reference the OpcUaServer
|
||||
/// compose assembly; kept in sync with that copy.
|
||||
/// </summary>
|
||||
private static IReadOnlyList<string> ExtractDependencyRefs(string scriptSource)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(scriptSource)) return Array.Empty<string>();
|
||||
var seen = new HashSet<string>(StringComparer.Ordinal);
|
||||
var result = new List<string>();
|
||||
foreach (System.Text.RegularExpressions.Match m in GetTagRefRegex.Matches(scriptSource))
|
||||
{
|
||||
var r = m.Groups[1].Value;
|
||||
if (seen.Add(r)) result.Add(r);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extract the driver-side full reference from a tag's TagConfig JSON (top-level "FullName"
|
||||
/// field). The artifact-decode mirror of <c>Phase7Composer.ExtractTagFullName</c> /
|
||||
|
||||
@@ -3,6 +3,7 @@ using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||
@@ -14,6 +15,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
@@ -52,8 +54,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
private readonly IActorRef? _dependencyMux;
|
||||
private readonly IActorRef? _opcUaPublishActor;
|
||||
private readonly IDriverHealthPublisher _healthPublisher;
|
||||
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
|
||||
private readonly IActorRef? _virtualTagHostOverride;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
/// <summary>The single VirtualTag-host child that spawns/reconciles Equipment-namespace
|
||||
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in
|
||||
/// <see cref="PreStart"/> when an OPC UA publish actor is wired; receives
|
||||
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> from <see cref="PushDesiredSubscriptions"/>.</summary>
|
||||
private IActorRef? _virtualTagHost;
|
||||
|
||||
private RevisionHash? _currentRevision;
|
||||
private DeploymentId? _applyingDeploymentId;
|
||||
|
||||
@@ -85,6 +95,14 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
||||
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
|
||||
/// so test harnesses and smoke fixtures don't need to wire it.</param>
|
||||
/// <param name="virtualTagEvaluator">Optional evaluator handed to the spawned
|
||||
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
|
||||
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
|
||||
/// Roslyn evaluator.</param>
|
||||
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
|
||||
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
|
||||
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
|
||||
/// production (the real host is spawned).</param>
|
||||
public static Props Props(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
@@ -93,9 +111,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
IReadOnlySet<string>? localRoles = null,
|
||||
IActorRef? dependencyMux = null,
|
||||
IActorRef? opcUaPublishActor = null,
|
||||
IDriverHealthPublisher? healthPublisher = null) =>
|
||||
IDriverHealthPublisher? healthPublisher = null,
|
||||
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
||||
IActorRef? virtualTagHostOverride = null) =>
|
||||
Akka.Actor.Props.Create(() => new DriverHostActor(
|
||||
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher));
|
||||
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
|
||||
healthPublisher, virtualTagEvaluator, virtualTagHostOverride));
|
||||
|
||||
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
|
||||
/// <param name="dbFactory">Database context factory for configuration database access.</param>
|
||||
@@ -106,6 +127,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
|
||||
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
||||
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
|
||||
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
|
||||
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
|
||||
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
|
||||
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
|
||||
public DriverHostActor(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
@@ -114,7 +139,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
IReadOnlySet<string>? localRoles = null,
|
||||
IActorRef? dependencyMux = null,
|
||||
IActorRef? opcUaPublishActor = null,
|
||||
IDriverHealthPublisher? healthPublisher = null)
|
||||
IDriverHealthPublisher? healthPublisher = null,
|
||||
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
||||
IActorRef? virtualTagHostOverride = null)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_localNode = localNode;
|
||||
@@ -124,6 +151,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
_dependencyMux = dependencyMux;
|
||||
_opcUaPublishActor = opcUaPublishActor;
|
||||
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
||||
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
|
||||
_virtualTagHostOverride = virtualTagHostOverride;
|
||||
|
||||
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
|
||||
Become(Steady);
|
||||
@@ -136,9 +165,40 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
|
||||
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
|
||||
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
|
||||
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
|
||||
SpawnVirtualTagHost();
|
||||
Bootstrap();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Spawns the single <see cref="VirtualTagHostActor"/> child that owns the Equipment-namespace
|
||||
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied
|
||||
/// <c>virtualTagHostOverride</c> short-circuits the spawn so a probe can intercept
|
||||
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/>. The real host requires a non-null
|
||||
/// <see cref="_opcUaPublishActor"/> (its ctor throws otherwise), so when no publish actor is
|
||||
/// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and
|
||||
/// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it.
|
||||
/// </summary>
|
||||
private void SpawnVirtualTagHost()
|
||||
{
|
||||
if (_virtualTagHostOverride is not null)
|
||||
{
|
||||
_virtualTagHost = _virtualTagHostOverride;
|
||||
return;
|
||||
}
|
||||
|
||||
if (_opcUaPublishActor is null)
|
||||
{
|
||||
_log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode);
|
||||
return;
|
||||
}
|
||||
|
||||
_virtualTagHost = Context.ActorOf(
|
||||
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator),
|
||||
"virtual-tag-host");
|
||||
}
|
||||
|
||||
private void Bootstrap()
|
||||
{
|
||||
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
|
||||
@@ -459,6 +519,22 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
|
||||
_localNode, total, refsByDriver.Count);
|
||||
}
|
||||
|
||||
// Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a
|
||||
// VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt
|
||||
// address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore
|
||||
// path (RestoreApplied) because both call this method, so one send covers both.
|
||||
// NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions,
|
||||
// so — like drivers — VirtualTags remain empty after a Stale recovery until the next
|
||||
// deployment dispatch. This is intentional and consistent with driver recovery: the Stale
|
||||
// path only restores the revision marker + NodeDeploymentState; a subsequent dispatch
|
||||
// (or a redeploy from AdminUI) triggers the full apply + subscribe pass.
|
||||
_virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags));
|
||||
if (composition.EquipmentVirtualTags.Count > 0)
|
||||
{
|
||||
_log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host",
|
||||
_localNode, composition.EquipmentVirtualTags.Count);
|
||||
}
|
||||
}
|
||||
|
||||
private void SpawnChild(DriverInstanceSpec spec)
|
||||
|
||||
@@ -238,6 +238,11 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
// clients can browse them. Live values arrive in a later milestone; until then the
|
||||
// variables show BadWaitingForInitialData.
|
||||
_applier.MaterialiseEquipmentTags(composition);
|
||||
// Equipment-namespace VirtualTags get their own pass right after the equipment tags:
|
||||
// ensures each computed signal's Variable (and any FolderPath sub-folder) exists under its
|
||||
// equipment folder with a folder-scoped NodeId. The VirtualTagActor fills live values in a
|
||||
// later milestone; until then the variables show BadWaitingForInitialData (same as tags).
|
||||
_applier.MaterialiseEquipmentVirtualTags(composition);
|
||||
|
||||
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair<string, object?>("kind", "rebuild"));
|
||||
_log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})",
|
||||
|
||||
@@ -5,6 +5,7 @@ using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
@@ -89,6 +90,16 @@ public static class ServiceCollectionExtensions
|
||||
var serviceLevel = resolver.GetService<IServiceLevelPublisher>() ?? NullServiceLevelPublisher.Instance;
|
||||
var loggerFactory = resolver.GetService<ILoggerFactory>() ?? NullLoggerFactory.Instance;
|
||||
var healthPublisher = resolver.GetService<IDriverHealthPublisher>() ?? NullDriverHealthPublisher.Instance;
|
||||
// Production evaluator is the Host's RoslynVirtualTagEvaluator (registered as
|
||||
// IVirtualTagEvaluator); fall back to the null evaluator for test harnesses that don't
|
||||
// register one (VirtualTagActor children then evaluate to nothing).
|
||||
var virtualTagEvaluator = resolver.GetService<IVirtualTagEvaluator>();
|
||||
if (virtualTagEvaluator is null)
|
||||
{
|
||||
loggerFactory.CreateLogger("ZB.MOM.WW.OtOpcUa.Runtime.ServiceCollectionExtensions")
|
||||
.LogWarning("IVirtualTagEvaluator not registered; Equipment VirtualTags will evaluate to NoChange (no live values). Expected only in test harnesses — driver-role nodes should register RoslynVirtualTagEvaluator.");
|
||||
virtualTagEvaluator = NullVirtualTagEvaluator.Instance;
|
||||
}
|
||||
|
||||
var dbHealth = system.ActorOf(
|
||||
DbHealthProbeActor.Props(dbFactory),
|
||||
@@ -119,7 +130,8 @@ public static class ServiceCollectionExtensions
|
||||
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles,
|
||||
dependencyMux: mux,
|
||||
opcUaPublishActor: publishActor,
|
||||
healthPublisher: healthPublisher),
|
||||
healthPublisher: healthPublisher,
|
||||
virtualTagEvaluator: virtualTagEvaluator),
|
||||
DriverHostActorName);
|
||||
registry.Register<DriverHostActorKey>(driverHost);
|
||||
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Supervisor that gives Equipment-namespace VirtualTags live values. For each
|
||||
/// <see cref="EquipmentVirtualTagPlan"/> in the desired set it spawns one child
|
||||
/// <see cref="VirtualTagActor"/> (which self-registers with the dependency mux and evaluates its
|
||||
/// expression on dependency changes) and remembers the plan's <b>folder-scoped NodeId</b>. When a
|
||||
/// child reports a fresh <see cref="VirtualTagActor.EvaluationResult"/>, the host bridges it onto
|
||||
/// an <see cref="OpcUaPublishActor.AttributeValueUpdate"/> targeting that NodeId so the
|
||||
/// already-materialised Variable node (currently BadWaitingForInitialData) reflects the value.
|
||||
///
|
||||
/// <para>
|
||||
/// The published NodeId is computed with the <b>identical</b> formula
|
||||
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> uses to materialise the variable —
|
||||
/// <c>{parent}/{Name}</c> where <c>parent = IsNullOrWhiteSpace(FolderPath) ? EquipmentId :
|
||||
/// {EquipmentId}/{FolderPath}</c> — or the value would land on a NodeId that does not exist.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public sealed class VirtualTagHostActor : ReceiveActor
|
||||
{
|
||||
/// <summary>Reconciles the live VirtualTag children to exactly the supplied desired set:
|
||||
/// stops children whose vtagId is gone, spawns children for new vtagIds, and rebuilds the
|
||||
/// vtagId→NodeId map so renames are reflected.</summary>
|
||||
/// <param name="Plans">The desired Equipment-namespace VirtualTag plans.</param>
|
||||
public sealed record ApplyVirtualTags(IReadOnlyList<EquipmentVirtualTagPlan> Plans);
|
||||
|
||||
private readonly IActorRef _publishActor;
|
||||
private readonly IActorRef? _mux;
|
||||
private readonly IVirtualTagEvaluator _evaluator;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
// vtagId -> spawned child VirtualTagActor.
|
||||
private readonly Dictionary<string, IActorRef> _children = new(StringComparer.Ordinal);
|
||||
// vtagId -> folder-scoped OPC UA NodeId the materialiser placed the variable at.
|
||||
private readonly Dictionary<string, string> _nodeIdByVtag = new(StringComparer.Ordinal);
|
||||
|
||||
/// <summary>Factory method to create Props for a VirtualTagHostActor.</summary>
|
||||
/// <param name="publishActor">The OPC UA publish actor that consumes
|
||||
/// <see cref="OpcUaPublishActor.AttributeValueUpdate"/> bridged from child results.</param>
|
||||
/// <param name="mux">Optional dependency multiplexer; passed to each spawned child so it can
|
||||
/// register interest in its dependency refs. Null on the dev/Mac path (no live values).</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public static Props Props(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagHostActor(publishActor, mux, evaluator));
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="VirtualTagHostActor"/> class.</summary>
|
||||
/// <param name="publishActor">The OPC UA publish actor results are bridged to.</param>
|
||||
/// <param name="mux">Optional dependency multiplexer passed to each spawned child.</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public VirtualTagHostActor(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(publishActor);
|
||||
ArgumentNullException.ThrowIfNull(evaluator);
|
||||
_publishActor = publishActor;
|
||||
_mux = mux;
|
||||
_evaluator = evaluator;
|
||||
|
||||
Receive<ApplyVirtualTags>(OnApply);
|
||||
Receive<VirtualTagActor.EvaluationResult>(OnResult);
|
||||
Receive<Terminated>(OnChildTerminated);
|
||||
}
|
||||
|
||||
private void OnApply(ApplyVirtualTags msg)
|
||||
{
|
||||
var desired = new HashSet<string>(msg.Plans.Select(p => p.VirtualTagId), StringComparer.Ordinal);
|
||||
|
||||
// Stop + forget children whose vtagId is no longer desired. Stopping the child triggers its
|
||||
// PostStop, which unregisters its interest from the mux.
|
||||
foreach (var vtagId in _children.Keys.Where(id => !desired.Contains(id)).ToList())
|
||||
{
|
||||
Context.Stop(_children[vtagId]);
|
||||
_children.Remove(vtagId);
|
||||
}
|
||||
|
||||
// Rebuild the NodeId map every apply so renames (Name/FolderPath/EquipmentId changes) are
|
||||
// picked up. The map only contains currently-desired vtags, so a result for a removed vtag
|
||||
// finds no entry and is dropped.
|
||||
_nodeIdByVtag.Clear();
|
||||
foreach (var p in msg.Plans)
|
||||
{
|
||||
_nodeIdByVtag[p.VirtualTagId] = NodeIdFor(p);
|
||||
}
|
||||
|
||||
// Spawn children for new vtagIds only — existing children keep their mux subscriptions and
|
||||
// last-value dedup state. Expression/dependency changes on an existing vtag are NOT
|
||||
// re-applied here; the loader's vtags are stable, and a future enhancement can stop+respawn
|
||||
// a child whose plan changed (the diff already identifies ChangedEquipmentVirtualTags).
|
||||
foreach (var p in msg.Plans)
|
||||
{
|
||||
// TODO(equipment-virtualtags): when a plan's Expression/DependencyRefs change in place
|
||||
// (ChangedEquipmentVirtualTags), stop+respawn the child here; today only spawn-new/stop-removed
|
||||
// is handled (loader vtags are stable).
|
||||
if (_children.ContainsKey(p.VirtualTagId)) continue;
|
||||
|
||||
// Auto-name the child: vtagIds can contain characters illegal in actor names, so let Akka
|
||||
// assign a safe unique name. The child self-registers with the mux in PreStart.
|
||||
var child = Context.ActorOf(VirtualTagActor.Props(
|
||||
virtualTagId: p.VirtualTagId,
|
||||
expression: p.Expression,
|
||||
evaluator: _evaluator,
|
||||
scriptId: p.VirtualTagId,
|
||||
publisherFactory: null,
|
||||
dependencyRefs: p.DependencyRefs,
|
||||
mux: _mux));
|
||||
Context.Watch(child);
|
||||
_children[p.VirtualTagId] = child;
|
||||
_log.Debug("VirtualTagHost: spawned child for vtag {VirtualTagId}", p.VirtualTagId);
|
||||
}
|
||||
|
||||
_log.Debug("VirtualTagHost: applied (desired={Desired}, children={Children})",
|
||||
desired.Count, _children.Count);
|
||||
}
|
||||
|
||||
private void OnResult(VirtualTagActor.EvaluationResult result)
|
||||
{
|
||||
// A result may arrive for a vtag that was just removed from the desired set (the child's
|
||||
// last in-flight message). With no NodeId mapping we have nowhere to land it — drop silently.
|
||||
if (!_nodeIdByVtag.TryGetValue(result.VirtualTagId, out var nodeId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_publishActor.Tell(new OpcUaPublishActor.AttributeValueUpdate(
|
||||
nodeId, result.Value, OpcUaQuality.Good, result.TimestampUtc));
|
||||
}
|
||||
|
||||
private void OnChildTerminated(Terminated msg)
|
||||
{
|
||||
var stale = _children.Where(kv => kv.Value.Equals(msg.ActorRef)).Select(kv => kv.Key).ToList();
|
||||
foreach (var id in stale)
|
||||
{
|
||||
_children.Remove(id);
|
||||
// NodeId map is rebuilt on the next ApplyVirtualTags; leaving the mapping is harmless
|
||||
// (no child will publish for it until respawned). A dead child is respawned on next apply.
|
||||
_log.Warning("VirtualTagHost: child for vtag {VirtualTagId} terminated; will respawn on next apply", id);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Folder-scoped NodeId for a VirtualTag plan — MUST match
|
||||
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> exactly, or the published value lands on a
|
||||
/// NodeId that was never materialised.</summary>
|
||||
private static string NodeIdFor(EquipmentVirtualTagPlan p)
|
||||
{
|
||||
var parent = string.IsNullOrWhiteSpace(p.FolderPath)
|
||||
? p.EquipmentId
|
||||
: $"{p.EquipmentId}/{p.FolderPath}";
|
||||
return $"{parent}/{p.Name}";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user