Wires the Phase 6.3 Stream B pure-logic pieces (ServiceLevelCalculator,
RecoveryStateManager, ApplyLeaseRegistry) + Stream A topology loader
(RedundancyCoordinator) into one orchestrator the runtime + OPC UA node
surface consume. The actual OPC UA variable-node plumbing (mapping
ServiceLevel Byte + ServerUriArray String[] onto the Opc.Ua.Server stack)
is narrower follow-up on top of this — the publisher emits change events
the OPC UA layer subscribes to.
Server.Redundancy additions:
- PeerReachability record + PeerReachabilityTracker — thread-safe
per-peer-NodeId holder of the latest (HttpHealthy, UaHealthy) tuple. Probe
loops (Stream B.1/B.2 runtime follow-up) write via Update; the publisher
reads via Get. PeerReachability.FullyHealthy / Unknown sentinels for the
two most-common states.
- RedundancyStatePublisher — pure orchestrator, no background timer, no OPC
UA stack dep. ComputeAndPublish reads the 6 inputs + calls the calculator:
* role (from coordinator.Current.SelfRole)
* selfHealthy (caller-supplied Func<bool>)
* peerHttpHealthy + peerUaHealthy (aggregate across all peers in
coordinator.Current.Peers)
* applyInProgress (ApplyLeaseRegistry.IsApplyInProgress)
* recoveryDwellMet (RecoveryStateManager.IsDwellMet)
* topologyValid (coordinator.IsTopologyValid)
* operatorMaintenance (caller-supplied Func<bool>)
Before-coordinator-init returns NoData=1 so clients never see an
authoritative value from an un-bootstrapped server.
OnStateChanged event fires edge-triggered when the byte changes;
OnServerUriArrayChanged fires edge-triggered when the topology's self-first
peer-sorted URI array content changes.
- ServiceLevelSnapshot record — per-tick output with Value + Band +
Topology. The OPC UA layer's ServiceLevel Byte node subscribes to
OnStateChanged; the ServerUriArray node subscribes to OnServerUriArrayChanged.
Tests (8 new RedundancyStatePublisherTests, all pass):
- Before-init returns NoData (Value=1, Band=NoData).
- Authoritative-Primary when healthy + peer fully reachable.
- Isolated-Primary (230) retains authority when peer unreachable — matches
decision #154 non-promotion semantics.
- Mid-apply band dominates: open lease → Value=200 even with peer healthy.
- Self-unhealthy → NoData regardless of other inputs.
- OnStateChanged fires only on value transitions (edge-triggered).
- OnServerUriArrayChanged fires once per topology content change; repeat
ticks with same topology don't re-emit.
- Standalone cluster treats healthy as AuthoritativePrimary=255.
Microsoft.EntityFrameworkCore.InMemory 10.0.0 added to Server.Tests for the
coordinator-backed publisher tests.
Full solution dotnet test: 1186 passing (was 1178, +8). Pre-existing
Client.CLI Subscribe flake unchanged.
Closes the core of release blocker #3 — the pure-logic + orchestration
layer now exists + is unit-tested. Remaining Stream C surfaces: OPC UA
ServiceLevel Byte variable wiring (binds to OnStateChanged), ServerUriArray
String[] wiring (binds to OnServerUriArrayChanged), RedundancySupport
static from RedundancyMode. Those touch the OPC UA stack directly + land
as Stream C.2 follow-up.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
143 lines
5.9 KiB
C#
143 lines
5.9 KiB
C#
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
|
|
|
/// <summary>
|
|
/// Orchestrates Phase 6.3 Stream C: feeds the <see cref="ServiceLevelCalculator"/> with the
|
|
/// current (topology, peer reachability, apply-in-progress, recovery dwell, self health)
|
|
/// inputs and emits the resulting <see cref="byte"/> + labelled <see cref="ServiceLevelBand"/>
|
|
/// to subscribers. The OPC UA <c>ServiceLevel</c> variable node consumes this via
|
|
/// <see cref="OnStateChanged"/> on every tick.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Pure orchestration — no background timer, no OPC UA stack dep. The caller (a
|
|
/// HostedService in a future PR, or a test) drives <see cref="ComputeAndPublish"/> at
|
|
/// whatever cadence is appropriate. Each call reads the inputs + recomputes the ServiceLevel
|
|
/// byte; state is fired on the <see cref="OnStateChanged"/> event when the byte differs from
|
|
/// the last emitted value (edge-triggered). The <see cref="OnServerUriArrayChanged"/> event
|
|
/// fires whenever the topology's <c>ServerUriArray</c> content changes.
|
|
/// </remarks>
|
|
public sealed class RedundancyStatePublisher
|
|
{
|
|
private readonly RedundancyCoordinator _coordinator;
|
|
private readonly ApplyLeaseRegistry _leases;
|
|
private readonly RecoveryStateManager _recovery;
|
|
private readonly PeerReachabilityTracker _peers;
|
|
private readonly Func<bool> _selfHealthy;
|
|
private readonly Func<bool> _operatorMaintenance;
|
|
private byte _lastByte = 255; // start at Authoritative — harmless before first tick
|
|
private IReadOnlyList<string>? _lastServerUriArray;
|
|
|
|
public RedundancyStatePublisher(
|
|
RedundancyCoordinator coordinator,
|
|
ApplyLeaseRegistry leases,
|
|
RecoveryStateManager recovery,
|
|
PeerReachabilityTracker peers,
|
|
Func<bool>? selfHealthy = null,
|
|
Func<bool>? operatorMaintenance = null)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(coordinator);
|
|
ArgumentNullException.ThrowIfNull(leases);
|
|
ArgumentNullException.ThrowIfNull(recovery);
|
|
ArgumentNullException.ThrowIfNull(peers);
|
|
|
|
_coordinator = coordinator;
|
|
_leases = leases;
|
|
_recovery = recovery;
|
|
_peers = peers;
|
|
_selfHealthy = selfHealthy ?? (() => true);
|
|
_operatorMaintenance = operatorMaintenance ?? (() => false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fires with the current ServiceLevel byte + band on every call to
|
|
/// <see cref="ComputeAndPublish"/> when the byte differs from the previously-emitted one.
|
|
/// </summary>
|
|
public event Action<ServiceLevelSnapshot>? OnStateChanged;
|
|
|
|
/// <summary>
|
|
/// Fires when the cluster's ServerUriArray (self + peers) content changes — e.g. an
|
|
/// operator adds or removes a peer. Consumer is the OPC UA <c>ServerUriArray</c>
|
|
/// variable node in Stream C.2.
|
|
/// </summary>
|
|
public event Action<IReadOnlyList<string>>? OnServerUriArrayChanged;
|
|
|
|
/// <summary>Snapshot of the last-published ServiceLevel byte — diagnostics + tests.</summary>
|
|
public byte LastByte => _lastByte;
|
|
|
|
/// <summary>
|
|
/// Compute the current ServiceLevel + emit change events if anything moved. Caller
|
|
/// drives cadence — a 1 s tick in production is reasonable; tests drive it directly.
|
|
/// </summary>
|
|
public ServiceLevelSnapshot ComputeAndPublish()
|
|
{
|
|
var topology = _coordinator.Current;
|
|
if (topology is null)
|
|
{
|
|
// Not yet initialized — surface NoData so clients don't treat us as authoritative.
|
|
return Emit((byte)ServiceLevelBand.NoData, null);
|
|
}
|
|
|
|
// Aggregate peer reachability. For 2-node v2.0 clusters there is at most one peer;
|
|
// treat "all peers healthy" as the boolean input to the calculator.
|
|
var peerReachable = topology.Peers.All(p => _peers.Get(p.NodeId).BothHealthy);
|
|
var peerUaHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).UaHealthy);
|
|
var peerHttpHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).HttpHealthy);
|
|
|
|
var role = MapRole(topology.SelfRole);
|
|
|
|
var value = ServiceLevelCalculator.Compute(
|
|
role: role,
|
|
selfHealthy: _selfHealthy(),
|
|
peerUaHealthy: peerUaHealthy,
|
|
peerHttpHealthy: peerHttpHealthy,
|
|
applyInProgress: _leases.IsApplyInProgress,
|
|
recoveryDwellMet: _recovery.IsDwellMet(),
|
|
topologyValid: _coordinator.IsTopologyValid,
|
|
operatorMaintenance: _operatorMaintenance());
|
|
|
|
MaybeFireServerUriArray(topology);
|
|
return Emit(value, topology);
|
|
}
|
|
|
|
private static RedundancyRole MapRole(RedundancyRole role) => role switch
|
|
{
|
|
// Standalone is serving; treat as Primary for the matrix since the calculator
|
|
// already special-cases Standalone inside its Compute.
|
|
RedundancyRole.Primary => RedundancyRole.Primary,
|
|
RedundancyRole.Secondary => RedundancyRole.Secondary,
|
|
_ => RedundancyRole.Standalone,
|
|
};
|
|
|
|
private ServiceLevelSnapshot Emit(byte value, RedundancyTopology? topology)
|
|
{
|
|
var snap = new ServiceLevelSnapshot(
|
|
Value: value,
|
|
Band: ServiceLevelCalculator.Classify(value),
|
|
Topology: topology);
|
|
|
|
if (value != _lastByte)
|
|
{
|
|
_lastByte = value;
|
|
OnStateChanged?.Invoke(snap);
|
|
}
|
|
return snap;
|
|
}
|
|
|
|
private void MaybeFireServerUriArray(RedundancyTopology topology)
|
|
{
|
|
var current = topology.ServerUriArray();
|
|
if (_lastServerUriArray is null || !current.SequenceEqual(_lastServerUriArray, StringComparer.Ordinal))
|
|
{
|
|
_lastServerUriArray = current;
|
|
OnServerUriArrayChanged?.Invoke(current);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>Per-tick output of <see cref="RedundancyStatePublisher.ComputeAndPublish"/>.</summary>
|
|
public sealed record ServiceLevelSnapshot(
|
|
byte Value,
|
|
ServiceLevelBand Band,
|
|
RedundancyTopology? Topology);
|