feat(vtags): wire IHistoryWriter through DriverHostActor (Null default; durable sink infra-gated) (H5d, stillpending §1)
This commit is contained in:
@@ -21,6 +21,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|||||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||||
@@ -65,6 +66,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
private readonly IActorRef? _opcUaPublishActor;
|
private readonly IActorRef? _opcUaPublishActor;
|
||||||
private readonly IDriverHealthPublisher _healthPublisher;
|
private readonly IDriverHealthPublisher _healthPublisher;
|
||||||
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
|
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
|
||||||
|
private readonly IHistoryWriter _historyWriter;
|
||||||
private readonly IActorRef? _virtualTagHostOverride;
|
private readonly IActorRef? _virtualTagHostOverride;
|
||||||
private readonly ILoggerFactory _loggerFactory;
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
private readonly ScriptRootLogger? _scriptRootLogger;
|
private readonly ScriptRootLogger? _scriptRootLogger;
|
||||||
@@ -196,6 +198,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
|
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
|
||||||
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
|
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
|
||||||
/// Roslyn evaluator.</param>
|
/// Roslyn evaluator.</param>
|
||||||
|
/// <param name="historyWriter">Optional sink handed to the spawned <see cref="VirtualTagHostActor"/>
|
||||||
|
/// for VirtualTag results whose plan opted into <c>Historize=true</c>; defaults to
|
||||||
|
/// <see cref="NullHistoryWriter"/> (the durable AVEVA sink is infra-gated, so no live-data historian
|
||||||
|
/// write RPC exists). A deployment that binds a real <see cref="IHistoryWriter"/> in DI overrides it.</param>
|
||||||
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
|
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
|
||||||
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
|
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
|
||||||
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
|
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
|
||||||
@@ -220,13 +226,14 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
IActorRef? opcUaPublishActor = null,
|
IActorRef? opcUaPublishActor = null,
|
||||||
IDriverHealthPublisher? healthPublisher = null,
|
IDriverHealthPublisher? healthPublisher = null,
|
||||||
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
||||||
|
IHistoryWriter? historyWriter = null,
|
||||||
IActorRef? virtualTagHostOverride = null,
|
IActorRef? virtualTagHostOverride = null,
|
||||||
ILoggerFactory? loggerFactory = null,
|
ILoggerFactory? loggerFactory = null,
|
||||||
ScriptRootLogger? scriptRootLogger = null,
|
ScriptRootLogger? scriptRootLogger = null,
|
||||||
IActorRef? scriptedAlarmHostOverride = null) =>
|
IActorRef? scriptedAlarmHostOverride = null) =>
|
||||||
Akka.Actor.Props.Create(() => new DriverHostActor(
|
Akka.Actor.Props.Create(() => new DriverHostActor(
|
||||||
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
|
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
|
||||||
healthPublisher, virtualTagEvaluator, virtualTagHostOverride,
|
healthPublisher, virtualTagEvaluator, historyWriter, virtualTagHostOverride,
|
||||||
loggerFactory, scriptRootLogger, scriptedAlarmHostOverride));
|
loggerFactory, scriptRootLogger, scriptedAlarmHostOverride));
|
||||||
|
|
||||||
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
|
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
|
||||||
@@ -240,6 +247,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
|
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
|
||||||
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
|
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
|
||||||
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
|
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
|
||||||
|
/// <param name="historyWriter">Optional sink handed to the spawned <see cref="VirtualTagHostActor"/>
|
||||||
|
/// for historized VirtualTag results; defaults to <see cref="NullHistoryWriter"/>.</param>
|
||||||
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
|
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
|
||||||
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
|
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
|
||||||
/// <param name="loggerFactory">Optional logger factory used to create the
|
/// <param name="loggerFactory">Optional logger factory used to create the
|
||||||
@@ -258,6 +267,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
IActorRef? opcUaPublishActor = null,
|
IActorRef? opcUaPublishActor = null,
|
||||||
IDriverHealthPublisher? healthPublisher = null,
|
IDriverHealthPublisher? healthPublisher = null,
|
||||||
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
||||||
|
IHistoryWriter? historyWriter = null,
|
||||||
IActorRef? virtualTagHostOverride = null,
|
IActorRef? virtualTagHostOverride = null,
|
||||||
ILoggerFactory? loggerFactory = null,
|
ILoggerFactory? loggerFactory = null,
|
||||||
ScriptRootLogger? scriptRootLogger = null,
|
ScriptRootLogger? scriptRootLogger = null,
|
||||||
@@ -272,6 +282,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
_opcUaPublishActor = opcUaPublishActor;
|
_opcUaPublishActor = opcUaPublishActor;
|
||||||
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
||||||
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
|
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
|
||||||
|
_historyWriter = historyWriter ?? NullHistoryWriter.Instance;
|
||||||
_virtualTagHostOverride = virtualTagHostOverride;
|
_virtualTagHostOverride = virtualTagHostOverride;
|
||||||
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
|
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
|
||||||
_scriptRootLogger = scriptRootLogger;
|
_scriptRootLogger = scriptRootLogger;
|
||||||
@@ -329,7 +340,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
}
|
}
|
||||||
|
|
||||||
_virtualTagHost = Context.ActorOf(
|
_virtualTagHost = Context.ActorOf(
|
||||||
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator),
|
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator, _historyWriter),
|
||||||
"virtual-tag-host");
|
"virtual-tag-host");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration;
|
|||||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||||
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
||||||
@@ -43,6 +44,10 @@ public static class ServiceCollectionExtensions
|
|||||||
{
|
{
|
||||||
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
|
services.TryAddSingleton<IAlarmHistorianSink>(NullAlarmHistorianSink.Instance);
|
||||||
services.TryAddSingleton<IHistorianDataSource>(NullHistorianDataSource.Instance);
|
services.TryAddSingleton<IHistorianDataSource>(NullHistorianDataSource.Instance);
|
||||||
|
// VirtualTag historization sink. Null default — the durable AVEVA sink is infra-gated (there is
|
||||||
|
// no live-data historian write RPC). TryAddSingleton so a deployment that bound a real
|
||||||
|
// IHistoryWriter earlier wins.
|
||||||
|
services.TryAddSingleton<IHistoryWriter>(NullHistoryWriter.Instance);
|
||||||
services.TryAddSingleton<IDriverFactory>(NullDriverFactory.Instance);
|
services.TryAddSingleton<IDriverFactory>(NullDriverFactory.Instance);
|
||||||
services.TryAddSingleton<IOpcUaAddressSpaceSink>(NullOpcUaAddressSpaceSink.Instance);
|
services.TryAddSingleton<IOpcUaAddressSpaceSink>(NullOpcUaAddressSpaceSink.Instance);
|
||||||
services.TryAddSingleton<IServiceLevelPublisher>(NullServiceLevelPublisher.Instance);
|
services.TryAddSingleton<IServiceLevelPublisher>(NullServiceLevelPublisher.Instance);
|
||||||
@@ -184,6 +189,10 @@ public static class ServiceCollectionExtensions
|
|||||||
virtualTagEvaluator = NullVirtualTagEvaluator.Instance;
|
virtualTagEvaluator = NullVirtualTagEvaluator.Instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VirtualTag historization sink threaded to the spawned VirtualTagHostActor. Null default
|
||||||
|
// (durable AVEVA sink is infra-gated); a deployment binding a real IHistoryWriter overrides.
|
||||||
|
var historyWriter = resolver.GetService<IHistoryWriter>() ?? NullHistoryWriter.Instance;
|
||||||
|
|
||||||
var dbHealth = system.ActorOf(
|
var dbHealth = system.ActorOf(
|
||||||
DbHealthProbeActor.Props(dbFactory),
|
DbHealthProbeActor.Props(dbFactory),
|
||||||
DbHealthProbeActorName);
|
DbHealthProbeActorName);
|
||||||
@@ -215,6 +224,7 @@ public static class ServiceCollectionExtensions
|
|||||||
opcUaPublishActor: publishActor,
|
opcUaPublishActor: publishActor,
|
||||||
healthPublisher: healthPublisher,
|
healthPublisher: healthPublisher,
|
||||||
virtualTagEvaluator: virtualTagEvaluator,
|
virtualTagEvaluator: virtualTagEvaluator,
|
||||||
|
historyWriter: historyWriter,
|
||||||
loggerFactory: loggerFactory,
|
loggerFactory: loggerFactory,
|
||||||
scriptRootLogger: scriptRootLogger),
|
scriptRootLogger: scriptRootLogger),
|
||||||
DriverHostActorName);
|
DriverHostActorName);
|
||||||
|
|||||||
+127
@@ -0,0 +1,127 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Text.Json;
|
||||||
|
using Akka.Actor;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// H5d (DI seam): pins that <see cref="DriverHostActor.Props"/> accepts an
|
||||||
|
/// <see cref="IHistoryWriter"/> and threads it through to the REAL spawned
|
||||||
|
/// <see cref="VirtualTagHostActor"/> (the <c>virtualTagHostOverride</c> is deliberately NOT used,
|
||||||
|
/// so the production spawn path that wires <c>historyWriter</c> is exercised).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// This is a construction/threading smoke test, not a record-behaviour test. The record path —
|
||||||
|
/// "a Historize=true result is recorded with the IHistoryWriter under the folder-scoped NodeId" —
|
||||||
|
/// is already covered end-to-end against the host directly by
|
||||||
|
/// <c>VirtualTagHostActorTests.Historized_vtag_result_is_recorded_with_the_history_writer</c> (H5c).
|
||||||
|
/// Driving a real spawned child to emit a synthetic <see cref="VirtualTagActor.EvaluationResult"/>
|
||||||
|
/// from inside <see cref="DriverHostActor"/> would require reaching the auto-named grandchild and a
|
||||||
|
/// live evaluator/mux feed — heavy and brittle for no extra coverage. So here we assert the writer
|
||||||
|
/// threads through cleanly: a real apply carrying a Historize=true Equipment VirtualTag is Applied
|
||||||
|
/// with the real host spawned and the capturing writer wired in, with no construction error.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class DriverHostActorHistoryWriterTests : RuntimeActorTestBase
|
||||||
|
{
|
||||||
|
private static readonly NodeId TestNode = NodeId.Parse("driver-hw-test");
|
||||||
|
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
|
||||||
|
|
||||||
|
/// <summary>Spawning a real DriverHostActor (no host override, publish actor wired so the real
|
||||||
|
/// VirtualTagHostActor spawns) with a capturing IHistoryWriter applies a Historize=true VirtualTag
|
||||||
|
/// deployment cleanly — proving Props/ctor accept the writer and thread it through the real spawn.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Props_accepts_history_writer_and_applies_historized_vtag_deployment()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var deploymentId = SeedDeploymentWithHistorizedVirtualTag(db, RevA);
|
||||||
|
|
||||||
|
var coordinator = CreateTestProbe();
|
||||||
|
// A publish actor MUST be wired for SpawnVirtualTagHost to spawn the real host (its ctor
|
||||||
|
// requires a non-null sink). No virtualTagHostOverride ⇒ the real VirtualTagHostActor is
|
||||||
|
// spawned with _historyWriter threaded in.
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var writer = new CapturingHistoryWriter();
|
||||||
|
|
||||||
|
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||||
|
db, TestNode, coordinator.Ref,
|
||||||
|
localRoles: new HashSet<string> { "driver" },
|
||||||
|
opcUaPublishActor: publish.Ref,
|
||||||
|
virtualTagEvaluator: NullVirtualTagEvaluator.Instance,
|
||||||
|
historyWriter: writer));
|
||||||
|
|
||||||
|
actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
|
||||||
|
|
||||||
|
// The deployment Applies — the real VirtualTagHostActor spawned and accepted ApplyVirtualTags
|
||||||
|
// with the writer threaded through (a construction/threading failure would have faulted PreStart
|
||||||
|
// or the apply turn). Record behaviour itself is covered by VirtualTagHostActorTests (H5c).
|
||||||
|
coordinator.ExpectMsg<ApplyAck>(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DeploymentId SeedDeploymentWithHistorizedVirtualTag(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> db, RevisionHash rev)
|
||||||
|
{
|
||||||
|
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
|
||||||
|
{
|
||||||
|
Scripts = new[]
|
||||||
|
{
|
||||||
|
new
|
||||||
|
{
|
||||||
|
ScriptId = "scr-1",
|
||||||
|
SourceCode = "return ctx.GetTag(\"TestMachine_001.TestDouble\").Value;",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
VirtualTags = new[]
|
||||||
|
{
|
||||||
|
new
|
||||||
|
{
|
||||||
|
VirtualTagId = "vt-1",
|
||||||
|
EquipmentId = "eq-1",
|
||||||
|
Name = "Doubled",
|
||||||
|
DataType = "Float",
|
||||||
|
ScriptId = "scr-1",
|
||||||
|
Historize = true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
var id = DeploymentId.NewId();
|
||||||
|
using var ctx = db.CreateDbContext();
|
||||||
|
ctx.Deployments.Add(new Deployment
|
||||||
|
{
|
||||||
|
DeploymentId = id.Value,
|
||||||
|
RevisionHash = rev.Value,
|
||||||
|
Status = DeploymentStatus.Sealed,
|
||||||
|
CreatedBy = "test",
|
||||||
|
SealedAtUtc = DateTime.UtcNow,
|
||||||
|
ArtifactBlob = artifact,
|
||||||
|
});
|
||||||
|
ctx.SaveChanges();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Capturing <see cref="IHistoryWriter"/>: records every Record call so the threading
|
||||||
|
/// smoke test can be tightened later if a record assertion becomes feasible here.</summary>
|
||||||
|
private sealed class CapturingHistoryWriter : IHistoryWriter
|
||||||
|
{
|
||||||
|
public readonly ConcurrentQueue<(string Path, DataValueSnapshot Value)> Calls = new();
|
||||||
|
|
||||||
|
/// <summary>Captures the path + snapshot of a Record call.</summary>
|
||||||
|
/// <param name="path">The virtual tag path.</param>
|
||||||
|
/// <param name="value">The data value snapshot.</param>
|
||||||
|
public void Record(string path, DataValueSnapshot value) => Calls.Enqueue((path, value));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user