feat(runtime): F10 OpcUaPublishActor sink seams + redundancy-driven ServiceLevel

OpcUaPublishActor now routes through pluggable seams instead of just
incrementing a counter:

- IOpcUaAddressSpaceSink (Commons.OpcUa) — WriteValue / WriteAlarmState
  / RebuildAddressSpace. OpcUaQuality enum moved here from the actor's
  nested type so producers don't have to reference the actor itself.
- IServiceLevelPublisher — Publish(byte). NullServiceLevelPublisher
  retains the last level for inspection.
- The actor subscribes to the redundancy-state DPS topic in PreStart
  and maps the local node's NodeRedundancyState to a coarse
  ServiceLevel (Primary+leader=240, Primary=200, Secondary=100,
  Detached=0). This keeps the local SDK's ServiceLevel node honest
  without round-tripping back through the admin-singleton calculator.
- ServiceLevelChanged dedupes identical levels so the SDK doesn't see
  redundant writes.
- Sink + publisher exceptions are caught and logged; the actor never
  crashes its own dispatcher.
- PropsForTests gets optional sink/publisher/localNode params and
  skips the DPS subscribe so unit tests stay on a vanilla TestKit
  cluster.

Production binding to a real SDK NodeManager + Variable nodes is the
remaining residual — split as F10b. Task 60 still blocked on F10b.

Tests: Runtime 40 -> 46 (+6):
- AttributeValueUpdate routes to sink
- AlarmStateUpdate routes to sink
- RebuildAddressSpace calls sink.Rebuild
- ServiceLevelChanged dedupes
- RedundancyStateChanged for primary-leader publishes 240
- RedundancyStateChanged for secondary publishes 100

All 6 v2 test suites green: 132 tests passing.
This commit is contained in:
Joseph Doherty
2026-05-26 09:10:55 -04:00
parent 14fb2b05ed
commit a1325299ce
5 changed files with 340 additions and 40 deletions

View File

@@ -84,7 +84,7 @@
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."},
{"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."}, {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."},
{"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."}, {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."},
{"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."}, {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."},
{"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."},
{"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."}, {"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."},
{"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "partial", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "commit": "36c4751-partial", "deviationNotes": "F13a (cert auto-creation) shipped in 36c4751. Remaining: endpoint-security wiring (SecurityProfileResolver into ServerConfiguration.SecurityPolicies), LDAP user-token validator (the OPC UA UserNameToken path; HTTP-layer LDAP auth is separate and already in OtOpcUa.Security), scripted-alarm node manager creation, history backend wiring, observability hooks (OpenTelemetry metrics + traces). These are gated by F10's OpcUaPublishActor SDK integration — until F10 lands, nothing instantiates OpcUaApplicationHost so the missing wiring is dead weight.", "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."}, {"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "partial", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "commit": "36c4751-partial", "deviationNotes": "F13a (cert auto-creation) shipped in 36c4751. Remaining: endpoint-security wiring (SecurityProfileResolver into ServerConfiguration.SecurityPolicies), LDAP user-token validator (the OPC UA UserNameToken path; HTTP-layer LDAP auth is separate and already in OtOpcUa.Security), scripted-alarm node manager creation, history backend wiring, observability hooks (OpenTelemetry metrics + traces). These are gated by F10's OpcUaPublishActor SDK integration — until F10 lands, nothing instantiates OpcUaApplicationHost so the missing wiring is dead weight.", "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."},

View File

@@ -0,0 +1,37 @@
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
/// <summary>
/// Abstraction over the OPC UA SDK's address space. <c>OpcUaPublishActor</c> consumes this
/// so the Runtime project doesn't reference <c>Opc.Ua.Server</c> directly — production
/// binds a real SDK-backed sink in the fused Host's wiring, dev/Mac binds the
/// <see cref="NullOpcUaAddressSpaceSink"/> no-op.
/// </summary>
public interface IOpcUaAddressSpaceSink
{
/// <summary>Write a Variable node's current value + quality + source timestamp.</summary>
void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc);
/// <summary>Write an alarm-condition Variable's active/acknowledged state.</summary>
void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc);
/// <summary>
/// Tear down + repopulate the address space. Called by <c>OpcUaPublishActor</c> after a
/// successful deployment apply so the node manager reflects the new config. Idempotent.
/// </summary>
void RebuildAddressSpace();
}
/// <summary>OPC UA status code projection — Good / Uncertain / Bad. Real SDK has finer-grained
/// codes; the engine actors only need this 3-state classification.</summary>
public enum OpcUaQuality { Good, Uncertain, Bad }
/// <summary>No-op sink. Bound by default so the actors are safe to run in dev / Mac /
/// integration tests without a real SDK behind them.</summary>
public sealed class NullOpcUaAddressSpaceSink : IOpcUaAddressSpaceSink
{
public static readonly NullOpcUaAddressSpaceSink Instance = new();
private NullOpcUaAddressSpaceSink() { }
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) { }
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) { }
public void RebuildAddressSpace() { }
}

View File

@@ -0,0 +1,22 @@
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
/// <summary>
/// Writes the OPC UA Server object's <c>ServiceLevel</c> Variable (0255). Production binds
/// a sink that pokes the SDK's ServiceLevel node; tests + dev mode bind
/// <see cref="NullServiceLevelPublisher"/> which just records the most recently set level
/// for inspection.
/// </summary>
public interface IServiceLevelPublisher
{
void Publish(byte serviceLevel);
}
/// <summary>No-op default that retains the last-written ServiceLevel in
/// <see cref="LastPublished"/>. Used by dev mode + verified by tests.</summary>
public sealed class NullServiceLevelPublisher : IServiceLevelPublisher
{
public static readonly NullServiceLevelPublisher Instance = new();
private NullServiceLevelPublisher() { }
public byte LastPublished { get; private set; }
public void Publish(byte serviceLevel) => LastPublished = serviceLevel;
}

View File

@@ -1,70 +1,178 @@
using Akka.Actor; using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event; using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Commons.Types;
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
/// <summary> /// <summary>
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on /// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
/// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees /// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees
/// only one thread per actor instance — its session/subscription locks expect strict /// only one thread per actor instance — its session/subscription locks expect strict
/// single-threaded access. /// single-threaded access.
/// ///
/// Engine wiring (call into <c>OpcUaApplicationHost</c> address-space writes, manage /// Address-space writes route through <see cref="IOpcUaAddressSpaceSink"/>; ServiceLevel
/// <c>ServiceLevel</c> + <c>ServerUriArray</c> nodes, subscribe to the <c>redundancy-state</c> /// writes route through <see cref="IServiceLevelPublisher"/>. Production binds SDK-backed
/// DistributedPubSub topic) is staged for follow-up F10. This skeleton compiles + exposes the /// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from
/// message contracts so producers (DriverInstance, VirtualTag, ScriptedAlarm) can target it. /// <c>Opc.Ua.Server</c>. The remaining piece is wiring those bindings to a real
/// <c>StandardServer</c> address space — tracked as F10b.
/// </summary> /// </summary>
public sealed class OpcUaPublishActor : ReceiveActor public sealed class OpcUaPublishActor : ReceiveActor
{ {
public const string DispatcherId = "opcua-synchronized-dispatcher"; public const string DispatcherId = "opcua-synchronized-dispatcher";
public const string RedundancyStateTopic = "redundancy-state";
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc); public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc);
public sealed record RebuildAddressSpace(CorrelationId Correlation); public sealed record RebuildAddressSpace(CorrelationId Correlation);
public sealed record ServiceLevelChanged(byte ServiceLevel); public sealed record ServiceLevelChanged(byte ServiceLevel);
public enum OpcUaQuality { Good, Uncertain, Bad } private readonly IOpcUaAddressSpaceSink _sink;
private readonly IServiceLevelPublisher _serviceLevel;
private readonly bool _subscribeRedundancyTopic;
private readonly NodeId? _localNode;
private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ILoggingAdapter _log = Context.GetLogger();
private int _writes; private int _writes;
private byte _lastServiceLevel;
/// <summary>
/// Returns Props pre-configured to use the <c>opcua-synchronized-dispatcher</c>. Caller can
/// still override by chaining <c>.WithDispatcher(otherId)</c> for unit tests.
/// </summary>
public static Props Props() =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor()).WithDispatcher(DispatcherId);
/// <summary>Test-only Props that omits the pinned dispatcher requirement.</summary>
public static Props PropsForTests() =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor());
public int WriteCount => _writes; public int WriteCount => _writes;
public byte LastServiceLevel => _lastServiceLevel;
public OpcUaPublishActor() /// <summary>Production Props — pins the OPC UA dispatcher + subscribes to the
/// <c>redundancy-state</c> DPS topic so cluster transitions drive the local ServiceLevel
/// publish path.</summary>
public static Props Props(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
NodeId? localNode = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
subscribeRedundancyTopic: true,
localNode)).WithDispatcher(DispatcherId);
/// <summary>Test-only Props that omits the pinned-dispatcher requirement and skips the
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.</summary>
public static Props PropsForTests(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
bool subscribeRedundancyTopic = false,
NodeId? localNode = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
subscribeRedundancyTopic,
localNode));
public OpcUaPublishActor(
IOpcUaAddressSpaceSink sink,
IServiceLevelPublisher serviceLevel,
bool subscribeRedundancyTopic,
NodeId? localNode)
{ {
Receive<AttributeValueUpdate>(msg => _sink = sink;
_serviceLevel = serviceLevel;
_subscribeRedundancyTopic = subscribeRedundancyTopic;
_localNode = localNode;
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
Receive<RebuildAddressSpace>(HandleRebuild);
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
protected override void PreStart()
{
if (_subscribeRedundancyTopic)
{ {
// F10: call into OpcUaApplicationHost to write the address-space node. DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
}
}
private void HandleAttributeUpdate(AttributeValueUpdate msg)
{
try
{
_sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc);
Interlocked.Increment(ref _writes); Interlocked.Increment(ref _writes);
_log.Debug("OpcUaPublish: queued AttributeValueUpdate for {Node} ({Quality}) (write staged for F10)", }
msg.NodeId, msg.Quality); catch (Exception ex)
});
Receive<AlarmStateUpdate>(msg =>
{ {
_log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId);
}
}
private void HandleAlarmUpdate(AlarmStateUpdate msg)
{
try
{
_sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc);
Interlocked.Increment(ref _writes); Interlocked.Increment(ref _writes);
_log.Debug("OpcUaPublish: queued AlarmStateUpdate for {Node} (active={Active})", }
msg.AlarmNodeId, msg.Active); catch (Exception ex)
});
Receive<RebuildAddressSpace>(msg =>
{ {
_log.Info("OpcUaPublish: address-space rebuild requested (correlation={Correlation}); F10 wires the SDK call", _log.Warning(ex, "OpcUaPublish: sink.WriteAlarmState threw for {Node}", msg.AlarmNodeId);
}
}
private void HandleRebuild(RebuildAddressSpace msg)
{
try
{
_sink.RebuildAddressSpace();
_log.Info("OpcUaPublish: address-space rebuilt (correlation={Correlation})", msg.Correlation);
}
catch (Exception ex)
{
_log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})",
msg.Correlation); msg.Correlation);
}); }
Receive<ServiceLevelChanged>(msg => }
private void HandleServiceLevelChanged(ServiceLevelChanged msg)
{
if (msg.ServiceLevel == _lastServiceLevel) return;
_lastServiceLevel = msg.ServiceLevel;
try
{ {
_log.Debug("OpcUaPublish: ServiceLevel={Level} (write staged for F10)", msg.ServiceLevel); _serviceLevel.Publish(msg.ServiceLevel);
}); _log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel);
}
catch (Exception ex)
{
_log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel);
}
}
/// <summary>
/// Compute a coarse ServiceLevel from the cluster snapshot and forward to the
/// <see cref="IServiceLevelPublisher"/>. This is a placeholder for F10b's full health
/// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0"
/// so the local SDK at least reflects role state. The full <see cref="ServiceLevelCalculator"/>
/// path (with DB-reachable, OPC UA probe inputs) lives in <c>RedundancyStateActor</c> on
/// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible
/// ServiceLevel without round-tripping back through the admin singleton.
/// </summary>
private void HandleRedundancyStateChanged(RedundancyStateChanged msg)
{
if (_localNode is null) return;
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
if (local is null) return;
byte level = local.Role switch
{
RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240,
RedundancyRole.Primary => 200,
RedundancyRole.Secondary => 100,
RedundancyRole.Detached => 0,
_ => 0,
};
Self.Tell(new ServiceLevelChanged(level));
} }
} }

View File

@@ -1,6 +1,9 @@
using System.Collections.Concurrent;
using Akka.Actor; using Akka.Actor;
using Shouldly; using Shouldly;
using Xunit; using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
@@ -13,12 +16,11 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
public void Accepts_message_contracts_without_pinned_dispatcher_in_tests() public void Accepts_message_contracts_without_pinned_dispatcher_in_tests()
{ {
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests()); var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests());
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaPublishActor.OpcUaQuality.Good, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaQuality.Good, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
// Actor stays alive; no exceptions surface.
ExpectNoMsg(TimeSpan.FromMilliseconds(200)); ExpectNoMsg(TimeSpan.FromMilliseconds(200));
} }
@@ -28,4 +30,135 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
var props = OpcUaPublishActor.Props(); var props = OpcUaPublishActor.Props();
props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId); props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId);
} }
[Fact]
public void AttributeValueUpdate_routes_to_sink_WriteValue()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T1", 3.14, OpcUaQuality.Good, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T2", "abc", OpcUaQuality.Uncertain, DateTime.UtcNow));
AwaitAssert(() =>
{
sink.Values.Count.ShouldBe(2);
sink.Values[0].NodeId.ShouldBe("ns=2;s=T1");
sink.Values[0].Value.ShouldBe(3.14);
sink.Values[0].Quality.ShouldBe(OpcUaQuality.Good);
sink.Values[1].Quality.ShouldBe(OpcUaQuality.Uncertain);
}, duration: TimeSpan.FromMilliseconds(500));
}
[Fact]
public void AlarmStateUpdate_routes_to_sink_WriteAlarmState()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=A1", Active: true, Acknowledged: false, DateTime.UtcNow));
AwaitAssert(() =>
{
sink.Alarms.Count.ShouldBe(1);
sink.Alarms[0].AlarmNodeId.ShouldBe("ns=2;s=A1");
sink.Alarms[0].Active.ShouldBeTrue();
sink.Alarms[0].Acknowledged.ShouldBeFalse();
}, duration: TimeSpan.FromMilliseconds(500));
}
[Fact]
public void RebuildAddressSpace_calls_sink_Rebuild()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500));
}
[Fact]
public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level()
{
var publisher = new RecordingPublisher();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); // dedup
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(100));
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240, 100 }),
duration: TimeSpan.FromMilliseconds(500));
}
[Fact]
public void RedundancyStateChanged_drives_local_ServiceLevel_publish_for_primary_leader()
{
var publisher = new RecordingPublisher();
var local = NodeId.Parse("primary-node");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
serviceLevel: publisher, localNode: local));
var snapshot = new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(local, RedundancyRole.Primary,
IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow),
new NodeRedundancyState(NodeId.Parse("other-node"), RedundancyRole.Secondary,
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
},
CorrelationId.NewId());
actor.Tell(snapshot);
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240 }),
duration: TimeSpan.FromMilliseconds(500));
}
[Fact]
public void RedundancyStateChanged_for_secondary_publishes_100()
{
var publisher = new RecordingPublisher();
var local = NodeId.Parse("secondary-node");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher, localNode: local));
var snapshot = new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(local, RedundancyRole.Secondary,
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
},
CorrelationId.NewId());
actor.Tell(snapshot);
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 100 }),
duration: TimeSpan.FromMilliseconds(500));
}
private sealed class RecordingSink : IOpcUaAddressSpaceSink
{
public ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> ValueQueue { get; } = new();
public ConcurrentQueue<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> AlarmQueue { get; } = new();
public int RebuildCalls;
public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values =>
ValueQueue.ToList();
public List<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> Alarms =>
AlarmQueue.ToList();
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) =>
ValueQueue.Enqueue((nodeId, value, quality, ts));
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) =>
AlarmQueue.Enqueue((alarmNodeId, active, acknowledged, ts));
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
}
private sealed class RecordingPublisher : IServiceLevelPublisher
{
private readonly ConcurrentQueue<byte> _q = new();
public byte[] Levels => _q.ToArray();
public void Publish(byte serviceLevel) => _q.Enqueue(serviceLevel);
}
} }