From c4824bea12d69af9e98ae363e9058116c9d13b08 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 11:31:50 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.3=20Stream=20C=20core=20=E2=80=94=20R?= =?UTF-8?q?edundancyStatePublisher=20+=20PeerReachability;=20orchestrates?= =?UTF-8?q?=20calculator=20inputs=20end-to-end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the Phase 6.3 Stream B pure-logic pieces (ServiceLevelCalculator, RecoveryStateManager, ApplyLeaseRegistry) + Stream A topology loader (RedundancyCoordinator) into one orchestrator the runtime + OPC UA node surface consume. The actual OPC UA variable-node plumbing (mapping ServiceLevel Byte + ServerUriArray String[] onto the Opc.Ua.Server stack) is narrower follow-up on top of this — the publisher emits change events the OPC UA layer subscribes to. Server.Redundancy additions: - PeerReachability record + PeerReachabilityTracker — thread-safe per-peer-NodeId holder of the latest (HttpHealthy, UaHealthy) tuple. Probe loops (Stream B.1/B.2 runtime follow-up) write via Update; the publisher reads via Get. PeerReachability.FullyHealthy / Unknown sentinels for the two most-common states. - RedundancyStatePublisher — pure orchestrator, no background timer, no OPC UA stack dep. ComputeAndPublish reads the 6 inputs + calls the calculator: * role (from coordinator.Current.SelfRole) * selfHealthy (caller-supplied Func) * peerHttpHealthy + peerUaHealthy (aggregate across all peers in coordinator.Current.Peers) * applyInProgress (ApplyLeaseRegistry.IsApplyInProgress) * recoveryDwellMet (RecoveryStateManager.IsDwellMet) * topologyValid (coordinator.IsTopologyValid) * operatorMaintenance (caller-supplied Func) Before-coordinator-init returns NoData=1 so clients never see an authoritative value from an un-bootstrapped server. OnStateChanged event fires edge-triggered when the byte changes; OnServerUriArrayChanged fires edge-triggered when the topology's self-first peer-sorted URI array content changes. - ServiceLevelSnapshot record — per-tick output with Value + Band + Topology. The OPC UA layer's ServiceLevel Byte node subscribes to OnStateChanged; the ServerUriArray node subscribes to OnServerUriArrayChanged. Tests (8 new RedundancyStatePublisherTests, all pass): - Before-init returns NoData (Value=1, Band=NoData). - Authoritative-Primary when healthy + peer fully reachable. - Isolated-Primary (230) retains authority when peer unreachable — matches decision #154 non-promotion semantics. - Mid-apply band dominates: open lease → Value=200 even with peer healthy. - Self-unhealthy → NoData regardless of other inputs. - OnStateChanged fires only on value transitions (edge-triggered). - OnServerUriArrayChanged fires once per topology content change; repeat ticks with same topology don't re-emit. - Standalone cluster treats healthy as AuthoritativePrimary=255. Microsoft.EntityFrameworkCore.InMemory 10.0.0 added to Server.Tests for the coordinator-backed publisher tests. Full solution dotnet test: 1186 passing (was 1178, +8). Pre-existing Client.CLI Subscribe flake unchanged. Closes the core of release blocker #3 — the pure-logic + orchestration layer now exists + is unit-tested. Remaining Stream C surfaces: OPC UA ServiceLevel Byte variable wiring (binds to OnStateChanged), ServerUriArray String[] wiring (binds to OnServerUriArrayChanged), RedundancySupport static from RedundancyMode. Those touch the OPC UA stack directly + land as Stream C.2 follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Redundancy/PeerReachability.cs | 42 ++++ .../Redundancy/RedundancyStatePublisher.cs | 142 ++++++++++++ .../RedundancyStatePublisherTests.cs | 213 ++++++++++++++++++ .../ZB.MOM.WW.OtOpcUa.Server.Tests.csproj | 1 + 4 files changed, 398 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyStatePublisher.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/RedundancyStatePublisherTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs new file mode 100644 index 0000000..7a623c0 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs @@ -0,0 +1,42 @@ +namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +/// +/// Latest observed reachability of the peer node per the Phase 6.3 Stream B.1/B.2 two-layer +/// probe model. HTTP layer is the fast-fail; UA layer is authoritative. +/// +/// +/// Fed into the as peerHttpHealthy + +/// peerUaHealthy. The concrete probe loops (PeerHttpProbeLoop + +/// PeerUaProbeLoop) live in a Stream B runtime follow-up — this type is the +/// contract the publisher reads; probers write via +/// . +/// +public sealed record PeerReachability(bool HttpHealthy, bool UaHealthy) +{ + public static readonly PeerReachability Unknown = new(false, false); + public static readonly PeerReachability FullyHealthy = new(true, true); + + /// True when both probes report healthy — the ServiceLevelCalculator's peerReachable gate. + public bool BothHealthy => HttpHealthy && UaHealthy; +} + +/// +/// Thread-safe holder of the latest per peer NodeId. Probe +/// loops call ; the reads via +/// . +/// +public sealed class PeerReachabilityTracker +{ + private readonly System.Collections.Concurrent.ConcurrentDictionary _byPeer = + new(StringComparer.OrdinalIgnoreCase); + + public void Update(string peerNodeId, PeerReachability reachability) + { + ArgumentException.ThrowIfNullOrWhiteSpace(peerNodeId); + _byPeer[peerNodeId] = reachability ?? throw new ArgumentNullException(nameof(reachability)); + } + + /// Current reachability for a peer. Returns when not yet probed. + public PeerReachability Get(string peerNodeId) => + _byPeer.TryGetValue(peerNodeId, out var r) ? r : PeerReachability.Unknown; +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyStatePublisher.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyStatePublisher.cs new file mode 100644 index 0000000..2ab6654 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyStatePublisher.cs @@ -0,0 +1,142 @@ +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +/// +/// Orchestrates Phase 6.3 Stream C: feeds the with the +/// current (topology, peer reachability, apply-in-progress, recovery dwell, self health) +/// inputs and emits the resulting + labelled +/// to subscribers. The OPC UA ServiceLevel variable node consumes this via +/// on every tick. +/// +/// +/// Pure orchestration — no background timer, no OPC UA stack dep. The caller (a +/// HostedService in a future PR, or a test) drives at +/// whatever cadence is appropriate. Each call reads the inputs + recomputes the ServiceLevel +/// byte; state is fired on the event when the byte differs from +/// the last emitted value (edge-triggered). The event +/// fires whenever the topology's ServerUriArray content changes. +/// +public sealed class RedundancyStatePublisher +{ + private readonly RedundancyCoordinator _coordinator; + private readonly ApplyLeaseRegistry _leases; + private readonly RecoveryStateManager _recovery; + private readonly PeerReachabilityTracker _peers; + private readonly Func _selfHealthy; + private readonly Func _operatorMaintenance; + private byte _lastByte = 255; // start at Authoritative — harmless before first tick + private IReadOnlyList? _lastServerUriArray; + + public RedundancyStatePublisher( + RedundancyCoordinator coordinator, + ApplyLeaseRegistry leases, + RecoveryStateManager recovery, + PeerReachabilityTracker peers, + Func? selfHealthy = null, + Func? operatorMaintenance = null) + { + ArgumentNullException.ThrowIfNull(coordinator); + ArgumentNullException.ThrowIfNull(leases); + ArgumentNullException.ThrowIfNull(recovery); + ArgumentNullException.ThrowIfNull(peers); + + _coordinator = coordinator; + _leases = leases; + _recovery = recovery; + _peers = peers; + _selfHealthy = selfHealthy ?? (() => true); + _operatorMaintenance = operatorMaintenance ?? (() => false); + } + + /// + /// Fires with the current ServiceLevel byte + band on every call to + /// when the byte differs from the previously-emitted one. + /// + public event Action? OnStateChanged; + + /// + /// Fires when the cluster's ServerUriArray (self + peers) content changes — e.g. an + /// operator adds or removes a peer. Consumer is the OPC UA ServerUriArray + /// variable node in Stream C.2. + /// + public event Action>? OnServerUriArrayChanged; + + /// Snapshot of the last-published ServiceLevel byte — diagnostics + tests. + public byte LastByte => _lastByte; + + /// + /// Compute the current ServiceLevel + emit change events if anything moved. Caller + /// drives cadence — a 1 s tick in production is reasonable; tests drive it directly. + /// + public ServiceLevelSnapshot ComputeAndPublish() + { + var topology = _coordinator.Current; + if (topology is null) + { + // Not yet initialized — surface NoData so clients don't treat us as authoritative. + return Emit((byte)ServiceLevelBand.NoData, null); + } + + // Aggregate peer reachability. For 2-node v2.0 clusters there is at most one peer; + // treat "all peers healthy" as the boolean input to the calculator. + var peerReachable = topology.Peers.All(p => _peers.Get(p.NodeId).BothHealthy); + var peerUaHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).UaHealthy); + var peerHttpHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).HttpHealthy); + + var role = MapRole(topology.SelfRole); + + var value = ServiceLevelCalculator.Compute( + role: role, + selfHealthy: _selfHealthy(), + peerUaHealthy: peerUaHealthy, + peerHttpHealthy: peerHttpHealthy, + applyInProgress: _leases.IsApplyInProgress, + recoveryDwellMet: _recovery.IsDwellMet(), + topologyValid: _coordinator.IsTopologyValid, + operatorMaintenance: _operatorMaintenance()); + + MaybeFireServerUriArray(topology); + return Emit(value, topology); + } + + private static RedundancyRole MapRole(RedundancyRole role) => role switch + { + // Standalone is serving; treat as Primary for the matrix since the calculator + // already special-cases Standalone inside its Compute. + RedundancyRole.Primary => RedundancyRole.Primary, + RedundancyRole.Secondary => RedundancyRole.Secondary, + _ => RedundancyRole.Standalone, + }; + + private ServiceLevelSnapshot Emit(byte value, RedundancyTopology? topology) + { + var snap = new ServiceLevelSnapshot( + Value: value, + Band: ServiceLevelCalculator.Classify(value), + Topology: topology); + + if (value != _lastByte) + { + _lastByte = value; + OnStateChanged?.Invoke(snap); + } + return snap; + } + + private void MaybeFireServerUriArray(RedundancyTopology topology) + { + var current = topology.ServerUriArray(); + if (_lastServerUriArray is null || !current.SequenceEqual(_lastServerUriArray, StringComparer.Ordinal)) + { + _lastServerUriArray = current; + OnServerUriArrayChanged?.Invoke(current); + } + } +} + +/// Per-tick output of . +public sealed record ServiceLevelSnapshot( + byte Value, + ServiceLevelBand Band, + RedundancyTopology? Topology); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/RedundancyStatePublisherTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/RedundancyStatePublisherTests.cs new file mode 100644 index 0000000..e61a308 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/RedundancyStatePublisherTests.cs @@ -0,0 +1,213 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +[Trait("Category", "Unit")] +public sealed class RedundancyStatePublisherTests : IDisposable +{ + private readonly OtOpcUaConfigDbContext _db; + private readonly IDbContextFactory _dbFactory; + + public RedundancyStatePublisherTests() + { + var options = new DbContextOptionsBuilder() + .UseInMemoryDatabase($"redundancy-publisher-{Guid.NewGuid():N}") + .Options; + _db = new OtOpcUaConfigDbContext(options); + _dbFactory = new DbContextFactory(options); + } + + public void Dispose() => _db.Dispose(); + + private sealed class DbContextFactory(DbContextOptions options) + : IDbContextFactory + { + public OtOpcUaConfigDbContext CreateDbContext() => new(options); + } + + private async Task SeedAndInitialize(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes) + { + var cluster = new ServerCluster + { + ClusterId = "c1", + Name = "Warsaw-West", + Enterprise = "zb", + Site = "warsaw-west", + RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm, + CreatedBy = "test", + }; + _db.ServerClusters.Add(cluster); + foreach (var (id, role, appUri) in nodes) + { + _db.ClusterNodes.Add(new ClusterNode + { + NodeId = id, + ClusterId = "c1", + RedundancyRole = role, + Host = id.ToLowerInvariant(), + ApplicationUri = appUri, + CreatedBy = "test", + }); + } + await _db.SaveChangesAsync(); + + var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger.Instance, selfNodeId, "c1"); + await coordinator.InitializeAsync(CancellationToken.None); + return coordinator; + } + + [Fact] + public async Task BeforeInit_Publishes_NoData() + { + // Coordinator not initialized — current topology is null. + var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger.Instance, "A", "c1"); + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker()); + + var snap = publisher.ComputeAndPublish(); + + snap.Band.ShouldBe(ServiceLevelBand.NoData); + snap.Value.ShouldBe((byte)1); + await Task.Yield(); + } + + [Fact] + public async Task AuthoritativePrimary_WhenHealthyAndPeerReachable() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.FullyHealthy); + + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers); + + var snap = publisher.ComputeAndPublish(); + + snap.Value.ShouldBe((byte)255); + snap.Band.ShouldBe(ServiceLevelBand.AuthoritativePrimary); + } + + [Fact] + public async Task IsolatedPrimary_WhenPeerUnreachable_RetainsAuthority() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.Unknown); + + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers); + + var snap = publisher.ComputeAndPublish(); + + snap.Value.ShouldBe((byte)230); + } + + [Fact] + public async Task MidApply_WhenLeaseOpen_Dominates() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var leases = new ApplyLeaseRegistry(); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.FullyHealthy); + + await using var lease = leases.BeginApplyLease(1, Guid.NewGuid()); + var publisher = new RedundancyStatePublisher( + coordinator, leases, new RecoveryStateManager(), peers); + + var snap = publisher.ComputeAndPublish(); + + snap.Value.ShouldBe((byte)200); + } + + [Fact] + public async Task SelfUnhealthy_Returns_NoData() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.FullyHealthy); + + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers, + selfHealthy: () => false); + + var snap = publisher.ComputeAndPublish(); + + snap.Value.ShouldBe((byte)1); + } + + [Fact] + public async Task OnStateChanged_FiresOnly_OnValueChange() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.FullyHealthy); + + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers); + + var emitCount = 0; + byte? lastEmitted = null; + publisher.OnStateChanged += snap => { emitCount++; lastEmitted = snap.Value; }; + + publisher.ComputeAndPublish(); // first tick — emits 255 since _lastByte was seeded at 255; no change + peers.Update("B", PeerReachability.Unknown); + publisher.ComputeAndPublish(); // 255 → 230 transition — emits + publisher.ComputeAndPublish(); // still 230 — no emit + + emitCount.ShouldBe(1); + lastEmitted.ShouldBe((byte)230); + } + + [Fact] + public async Task OnServerUriArrayChanged_FiresOnce_PerTopology() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var peers = new PeerReachabilityTracker(); + peers.Update("B", PeerReachability.FullyHealthy); + + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers); + + var emits = new List>(); + publisher.OnServerUriArrayChanged += arr => emits.Add(arr); + + publisher.ComputeAndPublish(); + publisher.ComputeAndPublish(); + publisher.ComputeAndPublish(); + + emits.Count.ShouldBe(1, "ServerUriArray event is edge-triggered on topology content change"); + emits[0].ShouldBe(["urn:A", "urn:B"]); + } + + [Fact] + public async Task Standalone_Cluster_IsAuthoritative_When_Healthy() + { + var coordinator = await SeedAndInitialize("A", + ("A", RedundancyRole.Standalone, "urn:A")); + var publisher = new RedundancyStatePublisher( + coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker()); + + var snap = publisher.ComputeAndPublish(); + + snap.Value.ShouldBe((byte)255); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj index 753ad75..0ce2883 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj @@ -14,6 +14,7 @@ + all -- 2.49.1