using System.Collections.Concurrent;
using System.Text.Json;
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
///
/// Task 9 — the focused END-TO-END proof that a driver-discovered FixedTree node is grafted into the
/// served Equipment OPC UA address space and a polled value reaches it. Unlike the Task-7/8 suites
/// (which wire the OPC UA publish side as a and assert on the
/// intercepted /
/// messages), this suite wires the FULL real chain:
///
///
/// - a real (resolves equipment, maps via
/// , extends the live-value routing map, caches the plan);
/// - a real as its opcUaPublishActor seam (so
/// MaterialiseDiscoveredNodes + AttributeValueUpdate are actually handled, not
/// intercepted);
/// - a real over a recording
/// (so the materialise + value-write reach the sink).
///
///
///
/// The assertions are therefore made on the SINK's recorded EnsureVariable /
/// RaiseNodesAddedModelChange / WriteValue calls — i.e. the discovered node was
/// materialised through the real applier AND a published value surfaces
/// at the mapped NodeId (in production this overwrites the BadWaitingForInitialData seed that
/// OtOpcUaNodeManager.EnsureVariable stamps on a freshly-materialised variable; the recording
/// sink does not model that seed, so the faithful assertion available here is that the live value
/// lands Good at the same NodeId the materialise created).
///
///
///
/// Seam choices (faithful to the sibling suites). Discovery is driven by Telling the host
/// directly, and the polled value by Telling
/// directly — exactly the seams the Task-7/8
/// tests use (there is no test seam to drive a real poll loop through a
/// child to Connected, and the spawned child is a ). The publish
/// actor is wired WITHOUT a dbFactory, so the host's apply-time
/// falls back to a raw sink.RebuildAddressSpace() (no EnsureVariable); this keeps the
/// ONLY EnsureVariable traffic on the sink the discovered-node materialise itself, so the
/// mapped NodeId is unambiguous. The discovery-injection chain (mapper → applier → sink + routing
/// map) is fully real.
///
///
[Trait("Category", "Unit")]
public sealed class DiscoveryInjectionEndToEndTests : RuntimeActorTestBase
{
private static readonly NodeId TestNode = NodeId.Parse("disc-e2e-node");
private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64));
private static readonly RevisionHash RevB = RevisionHash.Parse(new string('b', 64));
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5);
private static readonly DateTime Ts = new(2026, 6, 26, 10, 0, 0, DateTimeKind.Utc);
// The FixedTree node the driver "discovers": FOCAS//Identity/SeriesNumber, a String value,
// whose FullReference differs from any authored tag so the mapper keeps it (does not shadow an authored
// node). The single device-host folder collapses, so it materialises at EQ-1/FOCAS/Identity/SeriesNumber.
private const string FixedTreeRef = "10.0.0.5:8193/Identity/SeriesNumber";
private const string FixedTreeDisplayName = "SeriesNumber";
private static DiscoveredNode[] FixedTreeNodes() => new[]
{
new DiscoveredNode(
FolderPathSegments: new[] { "FOCAS", "10.0.0.5:8193", "Identity" },
BrowseName: "SeriesNumber",
DisplayName: FixedTreeDisplayName,
FullReference: FixedTreeRef,
DataType: DriverDataType.String,
IsArray: false,
ArrayDim: null,
Writable: false,
IsHistorized: false),
};
///
/// End-to-end #1: the discovered FixedTree node appears at the equipment AND a polled value flows
/// Good. Drives the real host (deployment applied, real child spawned, discovery reported) wired to a
/// real publish actor + real applier + recording sink, then asserts:
/// (a) the sink recorded an EnsureVariable for the FixedTree node under EQ-1 (materialised
/// through the REAL applier — the node now exists in the served address space), with a
/// RaiseNodesAddedModelChange under EQ-1 so connected clients refresh;
/// (b) after an for the FixedTree ref, the
/// sink recorded a WriteValue at THAT SAME NodeId carrying the value with
/// — proving the live value routed end-to-end and (in production)
/// overwrote the BadWaitingForInitialData seed.
///
[Fact]
public void Discovered_node_materialises_at_equipment_and_polled_value_flows_Good()
{
var db = NewInMemoryDbFactory();
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
var (host, sink, _) = SpawnHostWithRealPublishActor(db, deploymentId);
// Driver reports its captured FixedTree (the faithful Task-7/8 seam).
host.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", FixedTreeNodes()));
// (a) The discovered variable was materialised through the REAL applier onto the sink, under EQ-1.
string fixedTreeNodeId = null!;
AwaitAssert(() =>
{
var v = sink.Variables.SingleOrDefault(x => x.DisplayName == FixedTreeDisplayName);
v.NodeId.ShouldNotBeNull(); // EnsureVariable recorded for the FixedTree node
v.NodeId.ShouldStartWith("EQ-1"); // grafted UNDER the bound equipment root
v.DataType.ShouldBe("String"); // mapper carried the driver type through to the sink
v.Writable.ShouldBeFalse(); // discovered nodes are read-only
fixedTreeNodeId = v.NodeId;
sink.ModelChanges.ShouldContain("EQ-1"); // NodeAdded announced under the equipment
}, duration: Timeout);
// (b) A value published for the FixedTree ref routes to the mapped NodeId and lands Good — the live
// value flowed end-to-end (host routing map → publish actor → applier-backing sink WriteValue).
host.Tell(new DriverInstanceActor.AttributeValuePublished("d1", FixedTreeRef, "SN-12345", OpcUaQuality.Good, Ts));
AwaitAssert(() =>
{
var write = sink.Values.SingleOrDefault(x => x.NodeId == fixedTreeNodeId);
write.NodeId.ShouldBe(fixedTreeNodeId);
write.Value.ShouldBe("SN-12345");
write.Quality.ShouldBe(OpcUaQuality.Good);
write.Ts.ShouldBe(Ts);
}, duration: Timeout);
}
///
/// End-to-end #2 (Task 8 survival): the injected FixedTree node + its live-value route SURVIVE a
/// redeploy. After the first injection materialises + a value flows Good, a SECOND deployment (new
/// revision, same d1 → EQ-1 binding) re-runs PushDesiredSubscriptions — which clears the
/// routing maps and re-pushes an authored-only subscription set; the Task-8 tail re-apply re-grafts
/// the cached discovered plan. Asserts the sink records the FixedTree EnsureVariable AGAIN
/// (re-materialised after the rebuild) at the same NodeId, and a subsequent published value STILL
/// WriteValues Good there (the routing map was rebuilt, not left empty by the Clear()).
///
[Fact]
public void Discovered_node_and_value_survive_a_redeploy()
{
var db = NewInMemoryDbFactory();
var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA,
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
var (host, sink, coordinator) = SpawnHostWithRealPublishActor(db, deploymentId);
host.Tell(new DriverInstanceActor.DiscoveredNodesReady("d1", FixedTreeNodes()));
// First injection: capture the mapped NodeId once it has materialised on the sink.
string fixedTreeNodeId = null!;
AwaitAssert(() =>
{
var v = sink.Variables.SingleOrDefault(x => x.DisplayName == FixedTreeDisplayName);
v.NodeId.ShouldNotBeNull();
fixedTreeNodeId = v.NodeId;
}, duration: Timeout);
// First value flows Good (pre-redeploy baseline).
host.Tell(new DriverInstanceActor.AttributeValuePublished("d1", FixedTreeRef, "SN-AAA", OpcUaQuality.Good, Ts));
AwaitAssert(
() => sink.Values.ShouldContain(x => x.NodeId == fixedTreeNodeId && Equals(x.Value, "SN-AAA") && x.Quality == OpcUaQuality.Good),
duration: Timeout);
var ensureVarCountBefore = sink.Variables.Count(x => x.NodeId == fixedTreeNodeId);
// Apply a SECOND deployment (new revision, SAME d1 → EQ-1 binding) — re-runs PushDesiredSubscriptions
// (clears + rebuilds the routing maps) then the Task-8 tail re-applies the cached discovered plan.
var deploymentId2 = SeedDeploymentWithEquipmentTags(db, RevB,
(Equip: "EQ-1", Driver: "d1", FullName: "40001", Folder: (string?)null, Name: "speed"));
host.Tell(new DispatchDeployment(deploymentId2, RevB, CorrelationId.NewId()));
coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied);
// (a) The cached discovered plan was RE-MATERIALISED at the SAME NodeId after the redeploy rebuild.
AwaitAssert(
() => sink.Variables.Count(x => x.NodeId == fixedTreeNodeId).ShouldBeGreaterThan(ensureVarCountBefore),
duration: Timeout);
// (b) A value published AFTER the redeploy STILL routes to the mapped NodeId and lands Good — the
// live-value routing map was rebuilt by the re-apply (not lost when PushDesiredSubscriptions cleared it).
var tsAfter = Ts.AddSeconds(5);
host.Tell(new DriverInstanceActor.AttributeValuePublished("d1", FixedTreeRef, "SN-BBB", OpcUaQuality.Good, tsAfter));
AwaitAssert(
() => sink.Values.ShouldContain(x => x.NodeId == fixedTreeNodeId && Equals(x.Value, "SN-BBB") && x.Quality == OpcUaQuality.Good),
duration: Timeout);
}
/// Spawns the real chain — recording sink → real → real
/// (the host's opcUaPublishActor seam) → real
/// backed by a — dispatches the
/// deployment, and waits for the Applied ACK so _lastComposition + the live child + the initial
/// subscribe pass have completed before discovery is injected. The publish actor is wired with the
/// applier but NO dbFactory, so its apply-time RebuildAddressSpace is a raw sink rebuild (no EnsureVariable)
/// and the only EnsureVariable traffic is the discovered-node materialise itself.
private (IActorRef Host, RecordingSink Sink, Akka.TestKit.TestProbe Coordinator) SpawnHostWithRealPublishActor(
IDbContextFactory db, DeploymentId deploymentId)
{
var coordinator = CreateTestProbe();
var vtHost = CreateTestProbe();
var sink = new RecordingSink();
var applier = new AddressSpaceApplier(sink, NullLogger.Instance);
var publish = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink, applier: applier));
var host = Sys.ActorOf(DriverHostActor.Props(
db, TestNode, coordinator.Ref,
driverFactory: new SubscribingDriverFactory("Modbus"),
localRoles: new HashSet { "driver" },
opcUaPublishActor: publish,
virtualTagHostOverride: vtHost.Ref));
host.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId()));
coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied);
return (host, sink, coordinator);
}
/// Seeds a Sealed deployment whose artifact carries the minimal arrays needed to project
/// equipment tags + a real (non-stubbed) child for each driver
/// (mirrors DriverHostActorDiscoveryTests.SeedDeploymentWithEquipmentTags). An authored value tag
/// both sets _lastComposition and binds the driver → equipment (the only way the host resolves the
/// equipment a discovered node grafts under).
private static DeploymentId SeedDeploymentWithEquipmentTags(
IDbContextFactory db, RevisionHash rev,
params (string Equip, string Driver, string FullName, string? Folder, string Name)[] tags)
{
var driverIds = tags.Select(t => t.Driver).Distinct(StringComparer.Ordinal).ToArray();
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
{
Namespaces = new[]
{
new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0
},
DriverInstances = driverIds.Select(d => new
{
DriverInstanceRowId = Guid.NewGuid(),
DriverInstanceId = d,
Name = d,
DriverType = "Modbus", // not Windows-only ⇒ a real child is spawned (not stubbed)
Enabled = true,
DriverConfig = "{}",
NamespaceId = "ns-eq",
}).ToArray(),
Tags = tags.Select((t, i) => new
{
TagId = $"tag-{i}",
EquipmentId = t.Equip,
DriverInstanceId = t.Driver,
Name = t.Name,
FolderPath = t.Folder,
DataType = "Double",
TagConfig = JsonSerializer.Serialize(new { FullName = t.FullName }),
}).ToArray(),
});
var id = DeploymentId.NewId();
using var ctx = db.CreateDbContext();
ctx.Deployments.Add(new Deployment
{
DeploymentId = id.Value,
RevisionHash = rev.Value,
Status = DeploymentStatus.Sealed,
CreatedBy = "test",
SealedAtUtc = DateTime.UtcNow,
ArtifactBlob = artifact,
});
ctx.SaveChanges();
return id;
}
/// Factory producing a single shared for the supported
/// type, so a real (non-stubbed) child is spawned for the driver and
/// the host's subscribe path is exercised (mirrors
/// DriverHostActorDiscoveryTests.SubscribingDriverFactory).
private sealed class SubscribingDriverFactory : IDriverFactory
{
private readonly string _supportedType;
private readonly SubscribableStubDriver _driver = new();
public SubscribingDriverFactory(string supportedType) { _supportedType = supportedType; }
///
public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) =>
string.Equals(driverType, _supportedType, StringComparison.Ordinal) ? _driver : null;
///
public IReadOnlyCollection SupportedTypes => new[] { _supportedType };
}
/// Recording — captures the EnsureFolder / EnsureVariable /
/// WriteValue / RaiseNodesAddedModelChange calls the real applier + publish actor drive, so the test can
/// assert the discovered node was materialised and a value landed Good at its NodeId. Thread-safe (the
/// publish actor runs on an Akka dispatcher thread, the test asserts from the test thread).
private sealed class RecordingSink : IOpcUaAddressSpaceSink
{
private readonly ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName)> _folders = new();
private readonly ConcurrentQueue<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> _variables = new();
private readonly ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> _values = new();
private readonly ConcurrentQueue _modelChanges = new();
/// Gets a snapshot of the recorded EnsureFolder calls.
public List<(string NodeId, string? ParentNodeId, string DisplayName)> Folders => _folders.ToList();
/// Gets a snapshot of the recorded EnsureVariable calls.
public List<(string NodeId, string? ParentNodeId, string DisplayName, string DataType, bool Writable)> Variables => _variables.ToList();
/// Gets a snapshot of the recorded WriteValue calls.
public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values => _values.ToList();
/// Gets a snapshot of the recorded RaiseNodesAddedModelChange announcements.
public List ModelChanges => _modelChanges.ToList();
/// Gets the count of raw RebuildAddressSpace calls (apply-time rebuild fallback).
public int RebuildCalls;
/// Records a live-value write.
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc)
=> _values.Enqueue((nodeId, value, quality, sourceTimestampUtc));
/// No-op: alarm writes are not exercised by this suite.
public void WriteAlarmCondition(string alarmNodeId, AlarmConditionSnapshot state, DateTime sourceTimestampUtc) { }
/// No-op: alarm materialise is not exercised by this suite.
public void MaterialiseAlarmCondition(string alarmNodeId, string equipmentNodeId, string displayName, string alarmType, int severity, bool isNative = false) { }
/// Records an EnsureFolder call.
public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName)
=> _folders.Enqueue((folderNodeId, parentNodeId, displayName));
/// Records an EnsureVariable call.
public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType, bool writable, string? historianTagname = null, bool isArray = false, uint? arrayLength = null)
=> _variables.Enqueue((variableNodeId, parentFolderNodeId, displayName, dataType, writable));
/// Records a raw rebuild (the apply-time fallback when no dbFactory is wired).
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
/// Records a NodeAdded model-change announcement.
public void RaiseNodesAddedModelChange(string affectedNodeId) => _modelChanges.Enqueue(affectedNodeId);
}
}