Phase 6.3 Stream C core - RedundancyStatePublisher + PeerReachability #99
42
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs
Normal file
42
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs
Normal file
@@ -0,0 +1,42 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||
|
||||
/// <summary>
|
||||
/// Latest observed reachability of the peer node per the Phase 6.3 Stream B.1/B.2 two-layer
|
||||
/// probe model. HTTP layer is the fast-fail; UA layer is authoritative.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Fed into the <see cref="ServiceLevelCalculator"/> as <c>peerHttpHealthy</c> +
|
||||
/// <c>peerUaHealthy</c>. The concrete probe loops (<c>PeerHttpProbeLoop</c> +
|
||||
/// <c>PeerUaProbeLoop</c>) live in a Stream B runtime follow-up — this type is the
|
||||
/// contract the publisher reads; probers write via
|
||||
/// <see cref="PeerReachabilityTracker"/>.
|
||||
/// </remarks>
|
||||
public sealed record PeerReachability(bool HttpHealthy, bool UaHealthy)
|
||||
{
|
||||
public static readonly PeerReachability Unknown = new(false, false);
|
||||
public static readonly PeerReachability FullyHealthy = new(true, true);
|
||||
|
||||
/// <summary>True when both probes report healthy — the <c>ServiceLevelCalculator</c>'s peerReachable gate.</summary>
|
||||
public bool BothHealthy => HttpHealthy && UaHealthy;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thread-safe holder of the latest <see cref="PeerReachability"/> per peer NodeId. Probe
|
||||
/// loops call <see cref="Update"/>; the <see cref="RedundancyStatePublisher"/> reads via
|
||||
/// <see cref="Get"/>.
|
||||
/// </summary>
|
||||
public sealed class PeerReachabilityTracker
|
||||
{
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, PeerReachability> _byPeer =
|
||||
new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public void Update(string peerNodeId, PeerReachability reachability)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(peerNodeId);
|
||||
_byPeer[peerNodeId] = reachability ?? throw new ArgumentNullException(nameof(reachability));
|
||||
}
|
||||
|
||||
/// <summary>Current reachability for a peer. Returns <see cref="PeerReachability.Unknown"/> when not yet probed.</summary>
|
||||
public PeerReachability Get(string peerNodeId) =>
|
||||
_byPeer.TryGetValue(peerNodeId, out var r) ? r : PeerReachability.Unknown;
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||
|
||||
/// <summary>
|
||||
/// Orchestrates Phase 6.3 Stream C: feeds the <see cref="ServiceLevelCalculator"/> with the
|
||||
/// current (topology, peer reachability, apply-in-progress, recovery dwell, self health)
|
||||
/// inputs and emits the resulting <see cref="byte"/> + labelled <see cref="ServiceLevelBand"/>
|
||||
/// to subscribers. The OPC UA <c>ServiceLevel</c> variable node consumes this via
|
||||
/// <see cref="OnStateChanged"/> on every tick.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Pure orchestration — no background timer, no OPC UA stack dep. The caller (a
|
||||
/// HostedService in a future PR, or a test) drives <see cref="ComputeAndPublish"/> at
|
||||
/// whatever cadence is appropriate. Each call reads the inputs + recomputes the ServiceLevel
|
||||
/// byte; state is fired on the <see cref="OnStateChanged"/> event when the byte differs from
|
||||
/// the last emitted value (edge-triggered). The <see cref="OnServerUriArrayChanged"/> event
|
||||
/// fires whenever the topology's <c>ServerUriArray</c> content changes.
|
||||
/// </remarks>
|
||||
public sealed class RedundancyStatePublisher
|
||||
{
|
||||
private readonly RedundancyCoordinator _coordinator;
|
||||
private readonly ApplyLeaseRegistry _leases;
|
||||
private readonly RecoveryStateManager _recovery;
|
||||
private readonly PeerReachabilityTracker _peers;
|
||||
private readonly Func<bool> _selfHealthy;
|
||||
private readonly Func<bool> _operatorMaintenance;
|
||||
private byte _lastByte = 255; // start at Authoritative — harmless before first tick
|
||||
private IReadOnlyList<string>? _lastServerUriArray;
|
||||
|
||||
public RedundancyStatePublisher(
|
||||
RedundancyCoordinator coordinator,
|
||||
ApplyLeaseRegistry leases,
|
||||
RecoveryStateManager recovery,
|
||||
PeerReachabilityTracker peers,
|
||||
Func<bool>? selfHealthy = null,
|
||||
Func<bool>? operatorMaintenance = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(coordinator);
|
||||
ArgumentNullException.ThrowIfNull(leases);
|
||||
ArgumentNullException.ThrowIfNull(recovery);
|
||||
ArgumentNullException.ThrowIfNull(peers);
|
||||
|
||||
_coordinator = coordinator;
|
||||
_leases = leases;
|
||||
_recovery = recovery;
|
||||
_peers = peers;
|
||||
_selfHealthy = selfHealthy ?? (() => true);
|
||||
_operatorMaintenance = operatorMaintenance ?? (() => false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fires with the current ServiceLevel byte + band on every call to
|
||||
/// <see cref="ComputeAndPublish"/> when the byte differs from the previously-emitted one.
|
||||
/// </summary>
|
||||
public event Action<ServiceLevelSnapshot>? OnStateChanged;
|
||||
|
||||
/// <summary>
|
||||
/// Fires when the cluster's ServerUriArray (self + peers) content changes — e.g. an
|
||||
/// operator adds or removes a peer. Consumer is the OPC UA <c>ServerUriArray</c>
|
||||
/// variable node in Stream C.2.
|
||||
/// </summary>
|
||||
public event Action<IReadOnlyList<string>>? OnServerUriArrayChanged;
|
||||
|
||||
/// <summary>Snapshot of the last-published ServiceLevel byte — diagnostics + tests.</summary>
|
||||
public byte LastByte => _lastByte;
|
||||
|
||||
/// <summary>
|
||||
/// Compute the current ServiceLevel + emit change events if anything moved. Caller
|
||||
/// drives cadence — a 1 s tick in production is reasonable; tests drive it directly.
|
||||
/// </summary>
|
||||
public ServiceLevelSnapshot ComputeAndPublish()
|
||||
{
|
||||
var topology = _coordinator.Current;
|
||||
if (topology is null)
|
||||
{
|
||||
// Not yet initialized — surface NoData so clients don't treat us as authoritative.
|
||||
return Emit((byte)ServiceLevelBand.NoData, null);
|
||||
}
|
||||
|
||||
// Aggregate peer reachability. For 2-node v2.0 clusters there is at most one peer;
|
||||
// treat "all peers healthy" as the boolean input to the calculator.
|
||||
var peerReachable = topology.Peers.All(p => _peers.Get(p.NodeId).BothHealthy);
|
||||
var peerUaHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).UaHealthy);
|
||||
var peerHttpHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).HttpHealthy);
|
||||
|
||||
var role = MapRole(topology.SelfRole);
|
||||
|
||||
var value = ServiceLevelCalculator.Compute(
|
||||
role: role,
|
||||
selfHealthy: _selfHealthy(),
|
||||
peerUaHealthy: peerUaHealthy,
|
||||
peerHttpHealthy: peerHttpHealthy,
|
||||
applyInProgress: _leases.IsApplyInProgress,
|
||||
recoveryDwellMet: _recovery.IsDwellMet(),
|
||||
topologyValid: _coordinator.IsTopologyValid,
|
||||
operatorMaintenance: _operatorMaintenance());
|
||||
|
||||
MaybeFireServerUriArray(topology);
|
||||
return Emit(value, topology);
|
||||
}
|
||||
|
||||
private static RedundancyRole MapRole(RedundancyRole role) => role switch
|
||||
{
|
||||
// Standalone is serving; treat as Primary for the matrix since the calculator
|
||||
// already special-cases Standalone inside its Compute.
|
||||
RedundancyRole.Primary => RedundancyRole.Primary,
|
||||
RedundancyRole.Secondary => RedundancyRole.Secondary,
|
||||
_ => RedundancyRole.Standalone,
|
||||
};
|
||||
|
||||
private ServiceLevelSnapshot Emit(byte value, RedundancyTopology? topology)
|
||||
{
|
||||
var snap = new ServiceLevelSnapshot(
|
||||
Value: value,
|
||||
Band: ServiceLevelCalculator.Classify(value),
|
||||
Topology: topology);
|
||||
|
||||
if (value != _lastByte)
|
||||
{
|
||||
_lastByte = value;
|
||||
OnStateChanged?.Invoke(snap);
|
||||
}
|
||||
return snap;
|
||||
}
|
||||
|
||||
private void MaybeFireServerUriArray(RedundancyTopology topology)
|
||||
{
|
||||
var current = topology.ServerUriArray();
|
||||
if (_lastServerUriArray is null || !current.SequenceEqual(_lastServerUriArray, StringComparer.Ordinal))
|
||||
{
|
||||
_lastServerUriArray = current;
|
||||
OnServerUriArrayChanged?.Invoke(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Per-tick output of <see cref="RedundancyStatePublisher.ComputeAndPublish"/>.</summary>
|
||||
public sealed record ServiceLevelSnapshot(
|
||||
byte Value,
|
||||
ServiceLevelBand Band,
|
||||
RedundancyTopology? Topology);
|
||||
@@ -0,0 +1,213 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class RedundancyStatePublisherTests : IDisposable
|
||||
{
|
||||
private readonly OtOpcUaConfigDbContext _db;
|
||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||
|
||||
public RedundancyStatePublisherTests()
|
||||
{
|
||||
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||
.UseInMemoryDatabase($"redundancy-publisher-{Guid.NewGuid():N}")
|
||||
.Options;
|
||||
_db = new OtOpcUaConfigDbContext(options);
|
||||
_dbFactory = new DbContextFactory(options);
|
||||
}
|
||||
|
||||
public void Dispose() => _db.Dispose();
|
||||
|
||||
private sealed class DbContextFactory(DbContextOptions<OtOpcUaConfigDbContext> options)
|
||||
: IDbContextFactory<OtOpcUaConfigDbContext>
|
||||
{
|
||||
public OtOpcUaConfigDbContext CreateDbContext() => new(options);
|
||||
}
|
||||
|
||||
private async Task<RedundancyCoordinator> SeedAndInitialize(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes)
|
||||
{
|
||||
var cluster = new ServerCluster
|
||||
{
|
||||
ClusterId = "c1",
|
||||
Name = "Warsaw-West",
|
||||
Enterprise = "zb",
|
||||
Site = "warsaw-west",
|
||||
RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm,
|
||||
CreatedBy = "test",
|
||||
};
|
||||
_db.ServerClusters.Add(cluster);
|
||||
foreach (var (id, role, appUri) in nodes)
|
||||
{
|
||||
_db.ClusterNodes.Add(new ClusterNode
|
||||
{
|
||||
NodeId = id,
|
||||
ClusterId = "c1",
|
||||
RedundancyRole = role,
|
||||
Host = id.ToLowerInvariant(),
|
||||
ApplicationUri = appUri,
|
||||
CreatedBy = "test",
|
||||
});
|
||||
}
|
||||
await _db.SaveChangesAsync();
|
||||
|
||||
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, selfNodeId, "c1");
|
||||
await coordinator.InitializeAsync(CancellationToken.None);
|
||||
return coordinator;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BeforeInit_Publishes_NoData()
|
||||
{
|
||||
// Coordinator not initialized — current topology is null.
|
||||
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, "A", "c1");
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Band.ShouldBe(ServiceLevelBand.NoData);
|
||||
snap.Value.ShouldBe((byte)1);
|
||||
await Task.Yield();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AuthoritativePrimary_WhenHealthyAndPeerReachable()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.FullyHealthy);
|
||||
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Value.ShouldBe((byte)255);
|
||||
snap.Band.ShouldBe(ServiceLevelBand.AuthoritativePrimary);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsolatedPrimary_WhenPeerUnreachable_RetainsAuthority()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.Unknown);
|
||||
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Value.ShouldBe((byte)230);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MidApply_WhenLeaseOpen_Dominates()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var leases = new ApplyLeaseRegistry();
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.FullyHealthy);
|
||||
|
||||
await using var lease = leases.BeginApplyLease(1, Guid.NewGuid());
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, leases, new RecoveryStateManager(), peers);
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Value.ShouldBe((byte)200);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SelfUnhealthy_Returns_NoData()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.FullyHealthy);
|
||||
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers,
|
||||
selfHealthy: () => false);
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Value.ShouldBe((byte)1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnStateChanged_FiresOnly_OnValueChange()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.FullyHealthy);
|
||||
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||
|
||||
var emitCount = 0;
|
||||
byte? lastEmitted = null;
|
||||
publisher.OnStateChanged += snap => { emitCount++; lastEmitted = snap.Value; };
|
||||
|
||||
publisher.ComputeAndPublish(); // first tick — emits 255 since _lastByte was seeded at 255; no change
|
||||
peers.Update("B", PeerReachability.Unknown);
|
||||
publisher.ComputeAndPublish(); // 255 → 230 transition — emits
|
||||
publisher.ComputeAndPublish(); // still 230 — no emit
|
||||
|
||||
emitCount.ShouldBe(1);
|
||||
lastEmitted.ShouldBe((byte)230);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnServerUriArrayChanged_FiresOnce_PerTopology()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Primary, "urn:A"),
|
||||
("B", RedundancyRole.Secondary, "urn:B"));
|
||||
var peers = new PeerReachabilityTracker();
|
||||
peers.Update("B", PeerReachability.FullyHealthy);
|
||||
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||
|
||||
var emits = new List<IReadOnlyList<string>>();
|
||||
publisher.OnServerUriArrayChanged += arr => emits.Add(arr);
|
||||
|
||||
publisher.ComputeAndPublish();
|
||||
publisher.ComputeAndPublish();
|
||||
publisher.ComputeAndPublish();
|
||||
|
||||
emits.Count.ShouldBe(1, "ServerUriArray event is edge-triggered on topology content change");
|
||||
emits[0].ShouldBe(["urn:A", "urn:B"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Standalone_Cluster_IsAuthoritative_When_Healthy()
|
||||
{
|
||||
var coordinator = await SeedAndInitialize("A",
|
||||
("A", RedundancyRole.Standalone, "urn:A"));
|
||||
var publisher = new RedundancyStatePublisher(
|
||||
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
|
||||
|
||||
var snap = publisher.ComputeAndPublish();
|
||||
|
||||
snap.Value.ShouldBe((byte)255);
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0"/>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.0"/>
|
||||
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.126"/>
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
|
||||
Reference in New Issue
Block a user