Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs
Joseph Doherty 84fe88fadb Phase 6.3 Stream A — RedundancyTopology + ClusterTopologyLoader + RedundancyCoordinator
Lands the data path that feeds the Phase 6.3 ServiceLevelCalculator shipped in
PR #89. OPC UA node wiring (ServiceLevel variable + ServerUriArray +
RedundancySupport) still deferred to task #147; peer-probe loops (Stream B.1/B.2
runtime layer beyond the calculator logic) deferred.

Server.Redundancy additions:
- RedundancyTopology record — immutable snapshot (ClusterId, SelfNodeId,
  SelfRole, Mode, Peers[], SelfApplicationUri). ServerUriArray() emits the
  OPC UA Part 4 §6.6.2.2 shape (self first, peers lexicographically by
  NodeId). RedundancyPeer record with per-peer Host/OpcUaPort/DashboardPort/
  ApplicationUri so the follow-up peer-probe loops know where to probe.
- ClusterTopologyLoader — pure fn from ServerCluster + ClusterNode[] to
  RedundancyTopology. Enforces Phase 6.3 Stream A.1 invariants:
    * At least one node per cluster.
    * At most 2 nodes (decision #83, v2.0 cap).
    * Every node belongs to the target cluster.
    * Unique ApplicationUri across the cluster (OPC UA Part 4 trust pin,
      decision #86).
    * At most 1 Primary per cluster in Warm/Hot modes (decision #84).
    * Self NodeId must be a member of the cluster.
  Violations throw InvalidTopologyException with a decision-ID-tagged message
  so operators know which invariant + what to fix.
- RedundancyCoordinator singleton — holds the current topology + IsTopologyValid
  flag. InitializeAsync throws on invariant violation (startup fails fast).
  RefreshAsync logs + flips IsTopologyValid=false (runtime won't tear down a
  running server; ServiceLevelCalculator falls to InvalidTopology band = 2
  which surfaces the problem to clients without crashing). CAS-style swap
  via Volatile.Write so readers always see a coherent snapshot.

Tests (10 new ClusterTopologyLoaderTests):
- Single-node standalone loads + empty peer list.
- Two-node cluster loads self + peer.
- ServerUriArray puts self first + peers sort lexicographically.
- Empty-nodes throws.
- Self-not-in-cluster throws.
- Three-node cluster rejected with decision #83 message.
- Duplicate ApplicationUri rejected with decision #86 shape reference.
- Two Primaries in Warm mode rejected (decision #84 + runtime-band reference).
- Cross-cluster node rejected.
- None-mode allows any role mix (standalone clusters don't enforce Primary count).

Full solution dotnet test: 1178 passing (was 1168, +10). Pre-existing
Client.CLI Subscribe flake unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 11:24:14 -04:00

108 lines
4.9 KiB
C#

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Process-singleton holder of the current <see cref="RedundancyTopology"/>. Reads the
/// shared config DB at <see cref="InitializeAsync"/> time + re-reads on
/// <see cref="RefreshAsync"/> (called after <c>sp_PublishGeneration</c> completes so
/// operator role-swaps take effect without a process restart).
/// </summary>
/// <remarks>
/// <para>Per Phase 6.3 Stream A.1-A.2. The coordinator is the source of truth for the
/// <see cref="ServiceLevelCalculator"/> inputs: role (from topology), peer reachability
/// (from peer-probe loops — Stream B.1/B.2 follow-up), apply-in-progress (from
/// <see cref="ApplyLeaseRegistry"/>), topology-valid (from invariant checks at load time
/// + runtime detection of conflicting peer claims).</para>
///
/// <para>Topology refresh is CAS-style: a new <see cref="RedundancyTopology"/> instance
/// replaces the old one atomically via <see cref="Interlocked.Exchange{T}"/>. Readers
/// always see a coherent snapshot — never a partial transition.</para>
/// </remarks>
public sealed class RedundancyCoordinator
{
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
private readonly ILogger<RedundancyCoordinator> _logger;
private readonly string _selfNodeId;
private readonly string _selfClusterId;
private RedundancyTopology? _current;
private bool _topologyValid = true;
public RedundancyCoordinator(
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
ILogger<RedundancyCoordinator> logger,
string selfNodeId,
string selfClusterId)
{
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
ArgumentException.ThrowIfNullOrWhiteSpace(selfClusterId);
_dbContextFactory = dbContextFactory;
_logger = logger;
_selfNodeId = selfNodeId;
_selfClusterId = selfClusterId;
}
/// <summary>Last-loaded topology; null before <see cref="InitializeAsync"/> completes.</summary>
public RedundancyTopology? Current => Volatile.Read(ref _current);
/// <summary>
/// True when the last load/refresh completed without an invariant violation; false
/// forces <see cref="ServiceLevelCalculator"/> into the <see cref="ServiceLevelBand.InvalidTopology"/>
/// band regardless of other inputs.
/// </summary>
public bool IsTopologyValid => Volatile.Read(ref _topologyValid);
/// <summary>Load the topology for the first time. Throws on invariant violation.</summary>
public async Task InitializeAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: true, ct).ConfigureAwait(false);
}
/// <summary>
/// Re-read the topology from the shared DB. Called after <c>sp_PublishGeneration</c>
/// completes or after an Admin-triggered role-swap. Never throws — on invariant
/// violation it logs + flips <see cref="IsTopologyValid"/> false so the calculator
/// returns <see cref="ServiceLevelBand.InvalidTopology"/> = 2.
/// </summary>
public async Task RefreshAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: false, ct).ConfigureAwait(false);
}
private async Task RefreshInternalAsync(bool throwOnInvalid, CancellationToken ct)
{
await using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false);
var cluster = await db.ServerClusters.AsNoTracking()
.FirstOrDefaultAsync(c => c.ClusterId == _selfClusterId, ct).ConfigureAwait(false)
?? throw new InvalidTopologyException($"Cluster '{_selfClusterId}' not found in config DB.");
var nodes = await db.ClusterNodes.AsNoTracking()
.Where(n => n.ClusterId == _selfClusterId && n.Enabled)
.ToListAsync(ct).ConfigureAwait(false);
try
{
var topology = ClusterTopologyLoader.Load(_selfNodeId, cluster, nodes);
Volatile.Write(ref _current, topology);
Volatile.Write(ref _topologyValid, true);
_logger.LogInformation(
"Redundancy topology loaded: cluster={Cluster} self={Self} role={Role} mode={Mode} peers={PeerCount}",
topology.ClusterId, topology.SelfNodeId, topology.SelfRole, topology.Mode, topology.PeerCount);
}
catch (InvalidTopologyException ex)
{
Volatile.Write(ref _topologyValid, false);
_logger.LogError(ex,
"Redundancy topology invariant violation for cluster {Cluster}: {Reason}",
_selfClusterId, ex.Message);
if (throwOnInvalid) throw;
}
}
}