feat(redundancy): OpcUaPublishActor computes ServiceLevel via calculator (DB+stale+leader; legacy seam)
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Cluster.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
@@ -9,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
|
||||
@@ -54,10 +57,15 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
private readonly NodeId? _localNode;
|
||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext>? _dbFactory;
|
||||
private readonly Phase7Applier? _applier;
|
||||
private readonly IActorRef? _dbHealthProbe;
|
||||
private readonly TimeSpan _staleWindow;
|
||||
private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System);
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
private int _writes;
|
||||
private byte _lastServiceLevel;
|
||||
private DbHealthProbeActor.DbHealthStatus? _lastDbHealth;
|
||||
private RedundancyStateChanged? _lastSnapshot;
|
||||
private Phase7CompositionResult _lastApplied = new(
|
||||
Array.Empty<UnsAreaProjection>(),
|
||||
Array.Empty<UnsLineProjection>(),
|
||||
@@ -74,25 +82,35 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
/// <c>redundancy-state</c> DPS topic so cluster transitions drive the local ServiceLevel
|
||||
/// publish path. When <paramref name="dbFactory"/> + <paramref name="applier"/> are supplied,
|
||||
/// <see cref="RebuildAddressSpace"/> reads the latest deployment artifact + drives the
|
||||
/// applier through the sink.</summary>
|
||||
/// applier through the sink. When <paramref name="dbHealthProbe"/> is supplied the local
|
||||
/// ServiceLevel is computed via <see cref="ServiceLevelCalculator"/> from real DB-health +
|
||||
/// staleness + role-leader inputs; otherwise the legacy role-only switch is used.</summary>
|
||||
/// <param name="sink">The OPC UA address space sink.</param>
|
||||
/// <param name="serviceLevel">The service level publisher.</param>
|
||||
/// <param name="localNode">The local cluster node ID.</param>
|
||||
/// <param name="dbFactory">The optional database context factory.</param>
|
||||
/// <param name="applier">The optional Phase 7 applier.</param>
|
||||
/// <param name="dbHealthProbe">The optional <see cref="DbHealthProbeActor"/> ref; when null the
|
||||
/// legacy role-only ServiceLevel seam is used until a <see cref="DbHealthProbeActor.DbHealthStatus"/> arrives.</param>
|
||||
/// <param name="staleWindow">The window beyond which a DB-health sample or redundancy snapshot is
|
||||
/// considered stale; defaults to 30 seconds.</param>
|
||||
public static Props Props(
|
||||
IOpcUaAddressSpaceSink? sink = null,
|
||||
IServiceLevelPublisher? serviceLevel = null,
|
||||
NodeId? localNode = null,
|
||||
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
||||
Phase7Applier? applier = null) =>
|
||||
Phase7Applier? applier = null,
|
||||
IActorRef? dbHealthProbe = null,
|
||||
TimeSpan? staleWindow = null) =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
||||
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
||||
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
||||
subscribeRedundancyTopic: true,
|
||||
localNode,
|
||||
dbFactory,
|
||||
applier)).WithDispatcher(DispatcherId);
|
||||
applier,
|
||||
dbHealthProbe,
|
||||
staleWindow)).WithDispatcher(DispatcherId);
|
||||
|
||||
/// <summary>Test-only Props that omits the pinned-dispatcher requirement and skips the
|
||||
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.</summary>
|
||||
@@ -102,20 +120,28 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
/// <param name="localNode">The local cluster node ID.</param>
|
||||
/// <param name="dbFactory">The optional database context factory.</param>
|
||||
/// <param name="applier">The optional Phase 7 applier.</param>
|
||||
/// <param name="dbHealthProbe">The optional <see cref="DbHealthProbeActor"/> ref; when null the
|
||||
/// legacy role-only ServiceLevel seam is used until a <see cref="DbHealthProbeActor.DbHealthStatus"/> arrives.</param>
|
||||
/// <param name="staleWindow">The window beyond which a DB-health sample or redundancy snapshot is
|
||||
/// considered stale; defaults to 30 seconds.</param>
|
||||
public static Props PropsForTests(
|
||||
IOpcUaAddressSpaceSink? sink = null,
|
||||
IServiceLevelPublisher? serviceLevel = null,
|
||||
bool subscribeRedundancyTopic = false,
|
||||
NodeId? localNode = null,
|
||||
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
||||
Phase7Applier? applier = null) =>
|
||||
Phase7Applier? applier = null,
|
||||
IActorRef? dbHealthProbe = null,
|
||||
TimeSpan? staleWindow = null) =>
|
||||
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
|
||||
sink ?? NullOpcUaAddressSpaceSink.Instance,
|
||||
serviceLevel ?? NullServiceLevelPublisher.Instance,
|
||||
subscribeRedundancyTopic,
|
||||
localNode,
|
||||
dbFactory,
|
||||
applier));
|
||||
applier,
|
||||
dbHealthProbe,
|
||||
staleWindow));
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="OpcUaPublishActor"/> class.</summary>
|
||||
/// <param name="sink">The OPC UA address space sink.</param>
|
||||
@@ -124,13 +150,19 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
/// <param name="localNode">The local cluster node ID.</param>
|
||||
/// <param name="dbFactory">The optional database context factory.</param>
|
||||
/// <param name="applier">The optional Phase 7 applier.</param>
|
||||
/// <param name="dbHealthProbe">The optional <see cref="DbHealthProbeActor"/> ref; when null the
|
||||
/// legacy role-only ServiceLevel seam is used until a <see cref="DbHealthProbeActor.DbHealthStatus"/> arrives.</param>
|
||||
/// <param name="staleWindow">The window beyond which a DB-health sample or redundancy snapshot is
|
||||
/// considered stale; defaults to 30 seconds.</param>
|
||||
public OpcUaPublishActor(
|
||||
IOpcUaAddressSpaceSink sink,
|
||||
IServiceLevelPublisher serviceLevel,
|
||||
bool subscribeRedundancyTopic,
|
||||
NodeId? localNode,
|
||||
IDbContextFactory<OtOpcUaConfigDbContext>? dbFactory = null,
|
||||
Phase7Applier? applier = null)
|
||||
Phase7Applier? applier = null,
|
||||
IActorRef? dbHealthProbe = null,
|
||||
TimeSpan? staleWindow = null)
|
||||
{
|
||||
_sink = sink;
|
||||
_serviceLevel = serviceLevel;
|
||||
@@ -138,12 +170,15 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
_localNode = localNode;
|
||||
_dbFactory = dbFactory;
|
||||
_applier = applier;
|
||||
_dbHealthProbe = dbHealthProbe;
|
||||
_staleWindow = staleWindow ?? TimeSpan.FromSeconds(30);
|
||||
|
||||
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
|
||||
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
|
||||
Receive<RebuildAddressSpace>(HandleRebuild);
|
||||
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
|
||||
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
|
||||
Receive<DbHealthProbeActor.DbHealthStatus>(HandleDbHealthStatus);
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
@@ -319,30 +354,91 @@ public sealed class OpcUaPublishActor : ReceiveActor
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Compute a coarse ServiceLevel from the cluster snapshot and forward to the
|
||||
/// <see cref="IServiceLevelPublisher"/>. This is a placeholder for F10b's full health
|
||||
/// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0"
|
||||
/// so the local SDK at least reflects role state. The full <see cref="ServiceLevelCalculator"/>
|
||||
/// path (with DB-reachable, OPC UA probe inputs) lives in <c>RedundancyStateActor</c> on
|
||||
/// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible
|
||||
/// ServiceLevel without round-tripping back through the admin singleton.
|
||||
/// </summary>
|
||||
/// <summary>Caches the latest redundancy snapshot and recomputes the local ServiceLevel.
|
||||
/// The actual byte is produced by <see cref="RecomputeServiceLevel"/> — either via the
|
||||
/// health-aware <see cref="ServiceLevelCalculator"/> (once a DB-health probe + sample are
|
||||
/// wired) or via the legacy role-only seam (back-compat / bootstrap).</summary>
|
||||
private void HandleRedundancyStateChanged(RedundancyStateChanged msg)
|
||||
{
|
||||
if (_localNode is null) return;
|
||||
_lastSnapshot = msg;
|
||||
RecomputeServiceLevel();
|
||||
}
|
||||
|
||||
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||
if (local is null) return;
|
||||
/// <summary>Caches the latest DB-health sample and recomputes the local ServiceLevel. The
|
||||
/// probe pushes these (or the actor Asks for them); either way the freshest sample feeds the
|
||||
/// calculator's <c>DbReachable</c>/<c>Stale</c> inputs.</summary>
|
||||
private void HandleDbHealthStatus(DbHealthProbeActor.DbHealthStatus msg)
|
||||
{
|
||||
_lastDbHealth = msg;
|
||||
RecomputeServiceLevel();
|
||||
}
|
||||
|
||||
byte level = local.Role switch
|
||||
/// <summary>
|
||||
/// Computes the local OPC UA ServiceLevel and routes it through <see cref="ServiceLevelChanged"/>
|
||||
/// (the dedup/publish/metric handler). The full <see cref="ServiceLevelCalculator"/> path is
|
||||
/// used once a DB-health probe is wired AND a sample has arrived; until then (and when no probe
|
||||
/// is supplied at all) a legacy role-only seam keeps the historical "primary-leader → 240,
|
||||
/// secondary → 100, detached → 0" behaviour. The calculator does not model Detached, so a
|
||||
/// detached local node is guarded to 0 before either path runs.
|
||||
/// </summary>
|
||||
private void RecomputeServiceLevel()
|
||||
{
|
||||
if (_localNode is null || _lastSnapshot is null) return;
|
||||
|
||||
var entry = _lastSnapshot.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||
|
||||
// The calculator does NOT model Detached — a healthy detached node would wrongly compute
|
||||
// 240, so guard it (and the missing-entry case) to 0 here.
|
||||
if (entry is null || entry.Role == RedundancyRole.Detached)
|
||||
{
|
||||
RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240,
|
||||
RedundancyRole.Primary => 200,
|
||||
RedundancyRole.Secondary => 100,
|
||||
RedundancyRole.Detached => 0,
|
||||
_ => 0,
|
||||
};
|
||||
Self.Tell(new ServiceLevelChanged(level));
|
||||
Self.Tell(new ServiceLevelChanged(0));
|
||||
return;
|
||||
}
|
||||
|
||||
// Legacy / back-compat seam: with no DB-health probe wired (or before the first sample
|
||||
// arrives) fall back to the old role-only switch. This preserves historical behaviour and
|
||||
// is the bootstrap value until the first DbHealthStatus lands.
|
||||
if (_dbHealthProbe is null || _lastDbHealth is null)
|
||||
{
|
||||
Self.Tell(new ServiceLevelChanged(LegacyRoleOnly(entry)));
|
||||
return;
|
||||
}
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var inputs = new NodeHealthInputs(
|
||||
MemberState: SafeSelfStatus(),
|
||||
DbReachable: _lastDbHealth.Reachable,
|
||||
OpcUaProbeOk: true, // TODO(2b): wire the real OPC UA self-probe result here.
|
||||
Stale: !_lastDbHealth.Reachable
|
||||
|| (now - _lastDbHealth.AsOfUtc) > _staleWindow
|
||||
|| (now - entry.AsOfUtc) > _staleWindow,
|
||||
IsDriverRoleLeader: entry.IsRoleLeaderForDriver);
|
||||
|
||||
Self.Tell(new ServiceLevelChanged(ServiceLevelCalculator.Compute(inputs)));
|
||||
}
|
||||
|
||||
/// <summary>The legacy role-only ServiceLevel switch (primary-leader → 240, primary → 200,
|
||||
/// secondary → 100, _ → 0). Preserved as the back-compat / bootstrap seam.</summary>
|
||||
private static byte LegacyRoleOnly(NodeRedundancyState entry) => entry.Role switch
|
||||
{
|
||||
RedundancyRole.Primary when entry.IsRoleLeaderForDriver => 240,
|
||||
RedundancyRole.Primary => 200,
|
||||
RedundancyRole.Secondary => 100,
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
/// <summary>Reads this node's cluster <see cref="MemberStatus"/>, returning
|
||||
/// <see cref="MemberStatus.Removed"/> if the cluster is unavailable (so the calculator treats it
|
||||
/// as untrusted → 0 rather than throwing).</summary>
|
||||
private MemberStatus SafeSelfStatus()
|
||||
{
|
||||
try
|
||||
{
|
||||
return _cluster.SelfMember.Status;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return MemberStatus.Removed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user