607dc51dec
v2-ci / build (push) Failing after 42s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
Phase7Composer now carries UnsAreaProjection + UnsLineProjection lists so the applier can materialise the full UNS topology in the OPC UA address space. New IOpcUaAddressSpaceSink.EnsureFolder(folderNodeId, parentNodeId, displayName) seam (no-op default, recorded in tests, forwarded by DeferredAddressSpaceSink, implemented by SdkAddressSpaceSink). The SDK- side OtOpcUaNodeManager gains an EnsureFolder API that creates FolderState nodes with proper parent linkage; RebuildAddressSpace now clears folders too so re-applies don't accumulate stale topology. Phase7Applier.MaterialiseHierarchy walks composition.UnsAreas → composition.UnsLines → composition.EquipmentNodes, calling EnsureFolder with the correct parent at each level. Idempotent — calling twice with the same composition is a no-op. OpcUaPublishActor.HandleRebuild invokes it after Phase7Applier.Apply so OPC UA clients browsing the server now see Area/Line/Equipment as proper folders rather than flat tag ids. DeploymentArtifact.ParseComposition reads UnsAreas + UnsLines from the JSON snapshot the ControlPlane emits, populating the new fields when present. Phase7Composer.Compose now accepts UnsAreas + UnsLines; a 3-arg overload preserves the old signature for legacy callers + existing tests. The Phase7CompositionResult convenience ctor likewise keeps the planner tests working without UNS data. 3 new hierarchy tests (pure unit + boot-verify against a real OtOpcUaSdkServer); OpcUaServer suite is 48/48 green (was 45, +3), Runtime 74/74 unchanged. Closes #85.
268 lines
11 KiB
C#
268 lines
11 KiB
C#
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
|
|
|
/// <summary>
|
|
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
|
|
/// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees
|
|
/// only one thread per actor instance — its session/subscription locks expect strict
|
|
/// single-threaded access.
|
|
///
|
|
/// Address-space writes route through <see cref="IOpcUaAddressSpaceSink"/>; ServiceLevel
|
|
/// writes route through <see cref="IServiceLevelPublisher"/>. Production binds SDK-backed
|
|
/// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from
|
|
/// <c>Opc.Ua.Server</c>. The remaining piece is wiring those bindings to a real
|
|
/// <c>StandardServer</c> address space — tracked as F10b.
|
|
/// </summary>
|
|
public sealed class OpcUaPublishActor : ReceiveActor
|
|
{
|
|
public const string DispatcherId = "opcua-synchronized-dispatcher";
|
|
public const string RedundancyStateTopic = "redundancy-state";
|
|
|
|
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
|
public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc);
|
|
public sealed record RebuildAddressSpace(CorrelationId Correlation);
|
|
public sealed record ServiceLevelChanged(byte ServiceLevel);
|
|
|
|
private readonly IOpcUaAddressSpaceSink _sink;
|
|
private readonly IServiceLevelPublisher _serviceLevel;
|
|
private readonly bool _subscribeRedundancyTopic;
|
|
private readonly NodeId? _localNode;
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext>? _dbFactory;
|
|
private readonly Phase7Applier? _applier;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
private int _writes;
|
|
private byte _lastServiceLevel;
|
|
private Phase7CompositionResult _lastApplied = new(
|
|
Array.Empty<EquipmentNode>(),
|
|
Array.Empty<DriverInstancePlan>(),
|
|
Array.Empty<ScriptedAlarmPlan>());
|
|
|
|
public int WriteCount => _writes;
|
|
public byte LastServiceLevel => _lastServiceLevel;
|
|
|
|
/// <summary>Production Props — pins the OPC UA dispatcher + subscribes to the
|
|
/// <c>redundancy-state</c> DPS topic so cluster transitions drive the local ServiceLevel
|
|
/// publish path. When <paramref name="dbFactory"/> + <paramref name="applier"/> are supplied,
|
|
/// <see cref="RebuildAddressSpace"/> reads the latest deployment artifact + drives the
|
|
/// applier through the sink.</summary>
|
|
public static Props Props(
|
|
IOpcUaAddressSpaceSink? sink = null,
|
|
IServiceLevelPublisher? serviceLevel = null,
|
|
NodeId? localNode = null,
|
|
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
|
Phase7Applier? applier = null) =>
|
|
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
|
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
|
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
|
subscribeRedundancyTopic: true,
|
|
localNode,
|
|
dbFactory,
|
|
applier)).WithDispatcher(DispatcherId);
|
|
|
|
/// <summary>Test-only Props that omits the pinned-dispatcher requirement and skips the
|
|
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.</summary>
|
|
public static Props PropsForTests(
|
|
IOpcUaAddressSpaceSink? sink = null,
|
|
IServiceLevelPublisher? serviceLevel = null,
|
|
bool subscribeRedundancyTopic = false,
|
|
NodeId? localNode = null,
|
|
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
|
Phase7Applier? applier = null) =>
|
|
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
|
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
|
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
|
subscribeRedundancyTopic,
|
|
localNode,
|
|
dbFactory,
|
|
applier));
|
|
|
|
public OpcUaPublishActor(
|
|
IOpcUaAddressSpaceSink sink,
|
|
IServiceLevelPublisher serviceLevel,
|
|
bool subscribeRedundancyTopic,
|
|
NodeId? localNode,
|
|
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
|
Phase7Applier? applier = null)
|
|
{
|
|
_sink = sink;
|
|
_serviceLevel = serviceLevel;
|
|
_subscribeRedundancyTopic = subscribeRedundancyTopic;
|
|
_localNode = localNode;
|
|
_dbFactory = dbFactory;
|
|
_applier = applier;
|
|
|
|
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
|
|
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
|
|
Receive<RebuildAddressSpace>(HandleRebuild);
|
|
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
|
|
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
}
|
|
|
|
protected override void PreStart()
|
|
{
|
|
if (_subscribeRedundancyTopic)
|
|
{
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
|
|
}
|
|
}
|
|
|
|
private void HandleAttributeUpdate(AttributeValueUpdate msg)
|
|
{
|
|
try
|
|
{
|
|
_sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc);
|
|
Interlocked.Increment(ref _writes);
|
|
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair<string, object?>("kind", "value"));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId);
|
|
}
|
|
}
|
|
|
|
private void HandleAlarmUpdate(AlarmStateUpdate msg)
|
|
{
|
|
try
|
|
{
|
|
_sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc);
|
|
Interlocked.Increment(ref _writes);
|
|
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair<string, object?>("kind", "alarm"));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "OpcUaPublish: sink.WriteAlarmState threw for {Node}", msg.AlarmNodeId);
|
|
}
|
|
}
|
|
|
|
private void HandleRebuild(RebuildAddressSpace msg)
|
|
{
|
|
using var span = OtOpcUaTelemetry.StartAddressSpaceRebuildSpan();
|
|
span?.SetTag("otopcua.correlation_id", msg.Correlation.ToString());
|
|
|
|
// Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against
|
|
// the latest deployment artifact. Without them, fall back to a raw sink rebuild — the
|
|
// F10b/dev path before the integration completes.
|
|
if (_dbFactory is null || _applier is null)
|
|
{
|
|
try
|
|
{
|
|
_sink.RebuildAddressSpace();
|
|
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair<string, object?>("kind", "rebuild"));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})",
|
|
msg.Correlation);
|
|
}
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
var artifact = LoadLatestArtifact();
|
|
var composition = DeploymentArtifact.ParseComposition(artifact);
|
|
var plan = Phase7Planner.Compute(_lastApplied, composition);
|
|
|
|
if (plan.IsEmpty)
|
|
{
|
|
_log.Debug("OpcUaPublish: rebuild requested but plan is empty (correlation={Correlation})",
|
|
msg.Correlation);
|
|
return;
|
|
}
|
|
|
|
var outcome = _applier.Apply(plan);
|
|
_lastApplied = composition;
|
|
|
|
// #85 — after the plan diff lands, rebuild the UNS folder hierarchy so OPC UA
|
|
// clients see Area/Line/Equipment as proper folders. Idempotent; Phase7Applier
|
|
// skips folders that already exist with the same node id.
|
|
_applier.MaterialiseHierarchy(composition);
|
|
|
|
OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair<string, object?>("kind", "rebuild"));
|
|
_log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})",
|
|
msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Error(ex, "OpcUaPublish: rebuild pipeline threw (correlation={Correlation})", msg.Correlation);
|
|
}
|
|
}
|
|
|
|
/// <summary>Read the most recent <c>Sealed</c> deployment's artifact blob from ConfigDb.
|
|
/// Empty array on any failure — the parser treats empty blob as "no composition".</summary>
|
|
private byte[] LoadLatestArtifact()
|
|
{
|
|
try
|
|
{
|
|
using var db = _dbFactory!.CreateDbContext();
|
|
return db.Deployments.AsNoTracking()
|
|
.Where(d => d.Status == Configuration.Enums.DeploymentStatus.Sealed)
|
|
.OrderByDescending(d => d.SealedAtUtc)
|
|
.Select(d => d.ArtifactBlob)
|
|
.FirstOrDefault() ?? Array.Empty<byte>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "OpcUaPublish: failed to load latest deployment artifact; rebuild becomes no-op");
|
|
return Array.Empty<byte>();
|
|
}
|
|
}
|
|
|
|
private void HandleServiceLevelChanged(ServiceLevelChanged msg)
|
|
{
|
|
if (msg.ServiceLevel == _lastServiceLevel) return;
|
|
_lastServiceLevel = msg.ServiceLevel;
|
|
try
|
|
{
|
|
_serviceLevel.Publish(msg.ServiceLevel);
|
|
OtOpcUaTelemetry.ServiceLevelChange.Add(1,
|
|
new KeyValuePair<string, object?>("level", msg.ServiceLevel));
|
|
_log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Compute a coarse ServiceLevel from the cluster snapshot and forward to the
|
|
/// <see cref="IServiceLevelPublisher"/>. This is a placeholder for F10b's full health
|
|
/// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0"
|
|
/// so the local SDK at least reflects role state. The full <see cref="ServiceLevelCalculator"/>
|
|
/// path (with DB-reachable, OPC UA probe inputs) lives in <c>RedundancyStateActor</c> on
|
|
/// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible
|
|
/// ServiceLevel without round-tripping back through the admin singleton.
|
|
/// </summary>
|
|
private void HandleRedundancyStateChanged(RedundancyStateChanged msg)
|
|
{
|
|
if (_localNode is null) return;
|
|
|
|
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
|
if (local is null) return;
|
|
|
|
byte level = local.Role switch
|
|
{
|
|
RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240,
|
|
RedundancyRole.Primary => 200,
|
|
RedundancyRole.Secondary => 100,
|
|
RedundancyRole.Detached => 0,
|
|
_ => 0,
|
|
};
|
|
Self.Tell(new ServiceLevelChanged(level));
|
|
}
|
|
}
|