feat(vtags): forward historized vtag results to IHistoryWriter (H5c, stillpending §1)
This commit is contained in:
@@ -2,6 +2,8 @@ using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
|
||||
@@ -33,6 +35,9 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
||||
private readonly IActorRef _publishActor;
|
||||
private readonly IActorRef? _mux;
|
||||
private readonly IVirtualTagEvaluator _evaluator;
|
||||
// Sink for historized VirtualTag results (plans with Historize=true). NullHistoryWriter when no
|
||||
// durable historian is wired, so OnResult always has a non-null target.
|
||||
private readonly IHistoryWriter _history;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
// vtagId -> spawned child VirtualTagActor.
|
||||
@@ -50,20 +55,27 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
||||
/// <param name="mux">Optional dependency multiplexer; passed to each spawned child so it can
|
||||
/// register interest in its dependency refs. Null on the dev/Mac path (no live values).</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public static Props Props(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagHostActor(publishActor, mux, evaluator));
|
||||
/// <param name="historyWriter">Sink for results whose plan has <c>Historize=true</c>. Null ⇒
|
||||
/// <see cref="NullHistoryWriter.Instance"/> (no durable historian wired), so existing call sites
|
||||
/// compile unchanged and never historize.</param>
|
||||
public static Props Props(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator,
|
||||
IHistoryWriter? historyWriter = null) =>
|
||||
Akka.Actor.Props.Create(() => new VirtualTagHostActor(publishActor, mux, evaluator, historyWriter));
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="VirtualTagHostActor"/> class.</summary>
|
||||
/// <param name="publishActor">The OPC UA publish actor results are bridged to.</param>
|
||||
/// <param name="mux">Optional dependency multiplexer passed to each spawned child.</param>
|
||||
/// <param name="evaluator">The evaluator each child uses to compute its expression.</param>
|
||||
public VirtualTagHostActor(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator)
|
||||
/// <param name="historyWriter">Sink for historized results; null ⇒ <see cref="NullHistoryWriter.Instance"/>.</param>
|
||||
public VirtualTagHostActor(IActorRef publishActor, IActorRef? mux, IVirtualTagEvaluator evaluator,
|
||||
IHistoryWriter? historyWriter = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(publishActor);
|
||||
ArgumentNullException.ThrowIfNull(evaluator);
|
||||
_publishActor = publishActor;
|
||||
_mux = mux;
|
||||
_evaluator = evaluator;
|
||||
_history = historyWriter ?? NullHistoryWriter.Instance;
|
||||
|
||||
Receive<ApplyVirtualTags>(OnApply);
|
||||
Receive<VirtualTagActor.EvaluationResult>(OnResult);
|
||||
@@ -154,6 +166,15 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
||||
|
||||
_publishActor.Tell(new OpcUaPublishActor.AttributeValueUpdate(
|
||||
nodeId, result.Value, OpcUaQuality.Good, result.TimestampUtc));
|
||||
|
||||
// Historize iff the plan opted in. Reuses _planByVtag (kept in lock-step with _children), so
|
||||
// no parallel map. The historian path key is the SAME folder-scoped NodeId we just published
|
||||
// to. For a computed value source == server, so both timestamps are the evaluation time.
|
||||
if (_planByVtag.TryGetValue(result.VirtualTagId, out var plan) && plan.Historize)
|
||||
{
|
||||
_history.Record(nodeId, new DataValueSnapshot(
|
||||
result.Value, 0u /* StatusCodes.Good */, result.TimestampUtc, result.TimestampUtc));
|
||||
}
|
||||
}
|
||||
|
||||
private void OnChildTerminated(Terminated msg)
|
||||
|
||||
@@ -32,6 +32,9 @@
|
||||
<!-- IScriptLogPublisher lives in Core.Scripting; DpsScriptLogPublisher implements it
|
||||
here so the concrete Akka DPS routing stays out of the Core layer. -->
|
||||
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
|
||||
<!-- IHistoryWriter / NullHistoryWriter live in Core.VirtualTags; VirtualTagHostActor forwards
|
||||
historized VirtualTag results to it (H5c). The durable sink is wired by the host's DI. -->
|
||||
<ProjectReference Include="..\..\Core\ZB.MOM.WW.OtOpcUa.Core.VirtualTags\ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -4,6 +4,8 @@ using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
@@ -153,6 +155,71 @@ public sealed class VirtualTagHostActorTests : RuntimeActorTestBase
|
||||
secondChild.ShouldNotBe(firstChild);
|
||||
}
|
||||
|
||||
/// <summary>A plan with an explicit Historize flag, so H5c can assert the host historizes a
|
||||
/// result iff the plan opted in. Mirrors <see cref="Plan"/> but threads <paramref name="historize"/>.</summary>
|
||||
private static EquipmentVirtualTagPlan PlanH(
|
||||
string vtagId, string equipmentId, string name, bool historize, string folderPath = "") =>
|
||||
new(
|
||||
VirtualTagId: vtagId,
|
||||
EquipmentId: equipmentId,
|
||||
FolderPath: folderPath,
|
||||
Name: name,
|
||||
DataType: "Double",
|
||||
Expression: "ctx.GetTag(\"a\")",
|
||||
DependencyRefs: new[] { "a" },
|
||||
Historize: historize);
|
||||
|
||||
/// <summary>H5c: a result for a vtag whose plan has Historize=true is recorded with the
|
||||
/// IHistoryWriter under the SAME folder-scoped NodeId the value was published to, carrying the
|
||||
/// result value and OPC UA Good (0u) status — in addition to the normal publish.</summary>
|
||||
[Fact]
|
||||
public void Historized_vtag_result_is_recorded_with_the_history_writer()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var writer = new CapturingHistoryWriter();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator(), writer));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(
|
||||
new[] { PlanH("vt-1", "eq-1", "speed-rpm", historize: true) }));
|
||||
|
||||
var ts = new DateTime(2026, 6, 7, 12, 0, 0, DateTimeKind.Utc);
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-1", 42.0, ts, CorrelationId.NewId()));
|
||||
|
||||
// The publish still happens — historization is additive, not a replacement.
|
||||
var update = publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>();
|
||||
update.NodeId.ShouldBe("eq-1/speed-rpm");
|
||||
|
||||
// Wait for the history record to land (delivered on the same actor turn as the publish).
|
||||
AwaitAssert(() => writer.Calls.Count.ShouldBe(1));
|
||||
var (path, value) = writer.Calls[0];
|
||||
path.ShouldBe("eq-1/speed-rpm");
|
||||
value.Value.ShouldBe(42.0);
|
||||
value.StatusCode.ShouldBe(0u); // OPC UA Good
|
||||
value.SourceTimestampUtc.ShouldBe(ts);
|
||||
value.ServerTimestampUtc.ShouldBe(ts);
|
||||
}
|
||||
|
||||
/// <summary>H5c: a result for a vtag whose plan has Historize=false is NOT recorded — the writer
|
||||
/// is never called — though the value is still published.</summary>
|
||||
[Fact]
|
||||
public void Non_historized_vtag_result_is_not_recorded()
|
||||
{
|
||||
var publish = CreateTestProbe();
|
||||
var writer = new CapturingHistoryWriter();
|
||||
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux: null, new StubEvaluator(), writer));
|
||||
|
||||
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(
|
||||
new[] { PlanH("vt-1", "eq-1", "speed-rpm", historize: false) }));
|
||||
|
||||
host.Tell(new VirtualTagActor.EvaluationResult("vt-1", 42.0, DateTime.UtcNow, CorrelationId.NewId()));
|
||||
|
||||
// The value is still published…
|
||||
publish.ExpectMsg<OpcUaPublishActor.AttributeValueUpdate>();
|
||||
// …but the historian was never touched.
|
||||
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
writer.Calls.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
/// <summary>A plan with an explicit Expression + DependencyRefs (the H1b in-place-change case).</summary>
|
||||
private static EquipmentVirtualTagPlan PlanWithRefs(
|
||||
string vtagId, string equipmentId, string name, string expression, params string[] refs) =>
|
||||
@@ -277,6 +344,18 @@ public sealed class VirtualTagHostActorTests : RuntimeActorTestBase
|
||||
mux.LastSender.ShouldNotBe(firstChild);
|
||||
}
|
||||
|
||||
/// <summary>Capturing <see cref="IHistoryWriter"/>: records every Record call so H5c tests can
|
||||
/// assert the host historizes (or does not) and with what path + snapshot.</summary>
|
||||
private sealed class CapturingHistoryWriter : IHistoryWriter
|
||||
{
|
||||
public readonly List<(string Path, DataValueSnapshot Value)> Calls = new();
|
||||
|
||||
/// <summary>Captures the path + snapshot of a Record call.</summary>
|
||||
/// <param name="path">The virtual tag path.</param>
|
||||
/// <param name="value">The data value snapshot.</param>
|
||||
public void Record(string path, DataValueSnapshot value) => Calls.Add((path, value));
|
||||
}
|
||||
|
||||
/// <summary>Deterministic no-op evaluator: keeps spawned children inert so tests drive the host's
|
||||
/// OnResult path directly via synthetic EvaluationResults.</summary>
|
||||
private sealed class StubEvaluator : IVirtualTagEvaluator
|
||||
|
||||
Reference in New Issue
Block a user