feat(runtime): #109 OpcUaPublishActor — load artifact, compose, plan-diff, apply
Some checks failed
v2-ci / build (push) Failing after 45s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
Some checks failed
v2-ci / build (push) Failing after 45s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
Closes the loop between F10b (SDK NodeManager) and F14 (Phase7Plan +
Phase7Applier). DriverHostActor's successful apply now triggers a
RebuildAddressSpace on the publish actor, which loads the latest
deployment artifact + walks composer → planner → applier through the
sink. The OPC UA address space tracks the deployed composition.
DeploymentArtifact:
- New ParseComposition(blob) → Phase7CompositionResult that decodes
Equipment + DriverInstance + ScriptedAlarm arrays into the
projection records Phase7Planner consumes. Pascal-case property
names mirror ConfigComposer.SnapshotAndFlattenAsync's output.
- Each entity reader is tolerant: missing-id rows are dropped,
natural-key sort matches Phase7Composer's contract.
OpcUaPublishActor:
- New Props params: dbFactory + applier. When wired, RebuildAddressSpace
does:
1. LoadLatestArtifact (most recent Sealed Deployment.ArtifactBlob)
2. ParseComposition → Phase7CompositionResult
3. Phase7Planner.Compute(lastApplied, next) → Phase7Plan
4. Empty plan ⇒ no-op (deploy of unchanged composition is benign)
5. applier.Apply(plan) drives sink.RebuildAddressSpace +
WriteAlarmState for removed nodes
6. lastApplied = next so the next rebuild diffs forward
- Without dbFactory/applier wiring, falls back to raw
sink.RebuildAddressSpace — the dev/Mac path before #108 binds prod.
DriverHostActor:
- New Props param opcUaPublishActor (IActorRef?). After successful
ApplyAndAck (status Applied, ACK sent), tells the publish actor
RebuildAddressSpace with the same correlation id so the audit trail
threads through. Null publish actor ⇒ no trigger (admin-only nodes).
Tests: Runtime 63 -> 69 (+6):
- ParseComposition reads Equipment/Driver/Alarm sorted by natural key
- ParseComposition returns empty for empty blob
- Rebuild with dbFactory + sealed deployment artifact triggers exactly
one sink.Rebuild call (Equipment topology added)
- Rebuild with no artifact is idempotent no-op
- Second rebuild with same composition is empty-plan no-op
- Rebuild without dbFactory falls back to raw sink.Rebuild (legacy path)
All 6 v2 test suites green: 173 tests passing.
Closes #109. Engine-wiring data flow is now end-to-end through:
Deploy → DriverHostActor.ApplyAndAck → driver spawn + ACK +
RebuildAddressSpace → OpcUaPublishActor → Phase7Applier → SDK
NodeManager → subscribed OPC UA clients see the change.
This commit is contained in:
@@ -69,6 +69,48 @@ public sealed class DeploymentArtifactTests
|
||||
specs[1].Enabled.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParseComposition_returns_empty_for_empty_blob()
|
||||
{
|
||||
var c = DeploymentArtifact.ParseComposition(ReadOnlySpan<byte>.Empty);
|
||||
c.EquipmentNodes.ShouldBeEmpty();
|
||||
c.DriverInstancePlans.ShouldBeEmpty();
|
||||
c.ScriptedAlarmPlans.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParseComposition_reads_all_three_entity_classes_sorted_by_id()
|
||||
{
|
||||
var blob = JsonSerializer.SerializeToUtf8Bytes(new
|
||||
{
|
||||
Equipment = new[]
|
||||
{
|
||||
new { EquipmentId = "eq-z", MachineCode = "Z", UnsLineId = "line-1" },
|
||||
new { EquipmentId = "eq-a", MachineCode = "A", UnsLineId = "line-1" },
|
||||
},
|
||||
DriverInstances = new[]
|
||||
{
|
||||
new { DriverInstanceId = "drv-1", DriverType = "Modbus", DriverConfig = "{}" },
|
||||
},
|
||||
ScriptedAlarms = new[]
|
||||
{
|
||||
new
|
||||
{
|
||||
ScriptedAlarmId = "alarm-1",
|
||||
EquipmentId = "eq-a",
|
||||
PredicateScriptId = "script-1",
|
||||
MessageTemplate = "high",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
var c = DeploymentArtifact.ParseComposition(blob);
|
||||
|
||||
c.EquipmentNodes.Select(e => e.EquipmentId).ShouldBe(new[] { "eq-a", "eq-z" });
|
||||
c.DriverInstancePlans.Single().DriverInstanceId.ShouldBe("drv-1");
|
||||
c.ScriptedAlarmPlans.Single().ScriptedAlarmId.ShouldBe("alarm-1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Spec_missing_required_fields_is_dropped()
|
||||
{
|
||||
|
||||
@@ -0,0 +1,144 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text.Json;
|
||||
using Akka.Actor;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa;
|
||||
|
||||
public sealed class OpcUaPublishActorRebuildTests : RuntimeActorTestBase
|
||||
{
|
||||
[Fact]
|
||||
public void RebuildAddressSpace_with_dbFactory_loads_artifact_composes_and_applies()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
SeedDeployment(db, equipmentIds: new[] { "eq-1", "eq-2" }, driverIds: new[] { "drv-1" });
|
||||
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
sink: sink,
|
||||
dbFactory: db,
|
||||
applier: applier));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
|
||||
AwaitAssert(() =>
|
||||
{
|
||||
// Add path: Equipment + Driver + Alarm — but only Equipment/Alarm topology triggers
|
||||
// RebuildAddressSpace. With 2 new equipment we expect one Rebuild call.
|
||||
sink.RebuildCalls.ShouldBe(1);
|
||||
}, duration: TimeSpan.FromSeconds(2));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Rebuild_with_no_artifact_is_idempotent_no_op()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
|
||||
// No deployment seeded — LoadLatestArtifact returns empty blob.
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
sink: sink,
|
||||
dbFactory: db,
|
||||
applier: applier));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
Thread.Sleep(200);
|
||||
|
||||
sink.RebuildCalls.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Second_rebuild_with_same_artifact_is_empty_plan_no_op()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var sink = new RecordingSink();
|
||||
var applier = new Phase7Applier(sink, NullLogger<Phase7Applier>.Instance);
|
||||
SeedDeployment(db, equipmentIds: new[] { "eq-1" }, driverIds: Array.Empty<string>());
|
||||
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
sink: sink, dbFactory: db, applier: applier));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromSeconds(2));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
Thread.Sleep(200);
|
||||
// Same composition ⇒ plan IsEmpty ⇒ applier not called again.
|
||||
sink.RebuildCalls.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Rebuild_without_dbFactory_falls_back_to_raw_sink_rebuild()
|
||||
{
|
||||
// Pre-#109 behavior: no dbFactory wired ⇒ RebuildAddressSpace calls _sink.RebuildAddressSpace
|
||||
// directly. The dev/Mac path before the full integration is bound.
|
||||
var sink = new RecordingSink();
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
|
||||
|
||||
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||
|
||||
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
private static void SeedDeployment(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
string[] equipmentIds,
|
||||
string[] driverIds)
|
||||
{
|
||||
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
|
||||
{
|
||||
Equipment = equipmentIds.Select(id => new
|
||||
{
|
||||
EquipmentId = id,
|
||||
MachineCode = id.ToUpperInvariant(),
|
||||
UnsLineId = "line-1",
|
||||
Name = id,
|
||||
}).ToArray(),
|
||||
DriverInstances = driverIds.Select(id => new
|
||||
{
|
||||
DriverInstanceId = id,
|
||||
DriverType = "Modbus",
|
||||
Enabled = true,
|
||||
DriverConfig = "{}",
|
||||
}).ToArray(),
|
||||
ScriptedAlarms = Array.Empty<object>(),
|
||||
});
|
||||
|
||||
using var ctx = dbFactory.CreateDbContext();
|
||||
ctx.Deployments.Add(new Deployment
|
||||
{
|
||||
DeploymentId = Guid.NewGuid(),
|
||||
RevisionHash = new string('a', 64),
|
||||
Status = DeploymentStatus.Sealed,
|
||||
CreatedBy = "test",
|
||||
SealedAtUtc = DateTime.UtcNow,
|
||||
ArtifactBlob = artifact,
|
||||
});
|
||||
ctx.SaveChanges();
|
||||
}
|
||||
|
||||
private sealed class RecordingSink : IOpcUaAddressSpaceSink
|
||||
{
|
||||
public ConcurrentQueue<string> Calls { get; } = new();
|
||||
public int RebuildCalls;
|
||||
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts)
|
||||
=> Calls.Enqueue($"WV:{nodeId}");
|
||||
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts)
|
||||
=> Calls.Enqueue($"WA:{alarmNodeId}");
|
||||
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user