Merge pull request (#98) - Phase 6.3 Stream A topology loader

This commit was merged in pull request #98.
This commit is contained in:
2026-04-19 11:26:11 -04:00
4 changed files with 421 additions and 0 deletions

View File

@@ -0,0 +1,96 @@
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Pure-function mapper from the shared config DB's <see cref="ServerCluster"/> +
/// <see cref="ClusterNode"/> rows to an immutable <see cref="RedundancyTopology"/>.
/// Validates Phase 6.3 Stream A.1 invariants and throws
/// <see cref="InvalidTopologyException"/> on violation so the coordinator can fail startup
/// fast with a clear message rather than boot into an ambiguous state.
/// </summary>
/// <remarks>
/// Stateless — the caller owns the DB round-trip + hands rows in. Keeping it pure makes
/// the invariant matrix testable without EF or SQL Server.
/// </remarks>
public static class ClusterTopologyLoader
{
/// <summary>Build a topology snapshot for the given self node. Throws on invariant violation.</summary>
public static RedundancyTopology Load(string selfNodeId, ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
{
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
ArgumentNullException.ThrowIfNull(cluster);
ArgumentNullException.ThrowIfNull(nodes);
ValidateClusterShape(cluster, nodes);
ValidateUniqueApplicationUris(nodes);
ValidatePrimaryCount(cluster, nodes);
var self = nodes.FirstOrDefault(n => string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase))
?? throw new InvalidTopologyException(
$"Self node '{selfNodeId}' is not a member of cluster '{cluster.ClusterId}'. " +
$"Members: {string.Join(", ", nodes.Select(n => n.NodeId))}.");
var peers = nodes
.Where(n => !string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase))
.Select(n => new RedundancyPeer(
NodeId: n.NodeId,
Role: n.RedundancyRole,
Host: n.Host,
OpcUaPort: n.OpcUaPort,
DashboardPort: n.DashboardPort,
ApplicationUri: n.ApplicationUri))
.ToList();
return new RedundancyTopology(
ClusterId: cluster.ClusterId,
SelfNodeId: self.NodeId,
SelfRole: self.RedundancyRole,
Mode: cluster.RedundancyMode,
Peers: peers,
SelfApplicationUri: self.ApplicationUri);
}
private static void ValidateClusterShape(ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
{
if (nodes.Count == 0)
throw new InvalidTopologyException($"Cluster '{cluster.ClusterId}' has zero nodes.");
// Decision #83 — v2.0 caps clusters at two nodes.
if (nodes.Count > 2)
throw new InvalidTopologyException(
$"Cluster '{cluster.ClusterId}' has {nodes.Count} nodes. v2.0 supports at most 2 nodes per cluster (decision #83).");
// Every node must belong to the given cluster.
var wrongCluster = nodes.FirstOrDefault(n =>
!string.Equals(n.ClusterId, cluster.ClusterId, StringComparison.OrdinalIgnoreCase));
if (wrongCluster is not null)
throw new InvalidTopologyException(
$"Node '{wrongCluster.NodeId}' belongs to cluster '{wrongCluster.ClusterId}', not '{cluster.ClusterId}'.");
}
private static void ValidateUniqueApplicationUris(IReadOnlyList<ClusterNode> nodes)
{
var dup = nodes
.GroupBy(n => n.ApplicationUri, StringComparer.Ordinal)
.FirstOrDefault(g => g.Count() > 1);
if (dup is not null)
throw new InvalidTopologyException(
$"Nodes {string.Join(", ", dup.Select(n => n.NodeId))} share ApplicationUri '{dup.Key}'. " +
$"OPC UA Part 4 requires unique ApplicationUri per server — clients pin trust here (decision #86).");
}
private static void ValidatePrimaryCount(ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
{
// Standalone mode: any role is fine. Warm / Hot: at most one Primary per cluster.
if (cluster.RedundancyMode == RedundancyMode.None) return;
var primaries = nodes.Count(n => n.RedundancyRole == RedundancyRole.Primary);
if (primaries > 1)
throw new InvalidTopologyException(
$"Cluster '{cluster.ClusterId}' has {primaries} Primary nodes in redundancy mode {cluster.RedundancyMode}. " +
$"At most one Primary per cluster (decision #84). Runtime detects and demotes both to ServiceLevel 2 " +
$"per the 8-state matrix; startup fails fast to surface the misconfiguration earlier.");
}
}

View File

@@ -0,0 +1,107 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Process-singleton holder of the current <see cref="RedundancyTopology"/>. Reads the
/// shared config DB at <see cref="InitializeAsync"/> time + re-reads on
/// <see cref="RefreshAsync"/> (called after <c>sp_PublishGeneration</c> completes so
/// operator role-swaps take effect without a process restart).
/// </summary>
/// <remarks>
/// <para>Per Phase 6.3 Stream A.1-A.2. The coordinator is the source of truth for the
/// <see cref="ServiceLevelCalculator"/> inputs: role (from topology), peer reachability
/// (from peer-probe loops — Stream B.1/B.2 follow-up), apply-in-progress (from
/// <see cref="ApplyLeaseRegistry"/>), topology-valid (from invariant checks at load time
/// + runtime detection of conflicting peer claims).</para>
///
/// <para>Topology refresh is CAS-style: a new <see cref="RedundancyTopology"/> instance
/// replaces the old one atomically via <see cref="Interlocked.Exchange{T}"/>. Readers
/// always see a coherent snapshot — never a partial transition.</para>
/// </remarks>
public sealed class RedundancyCoordinator
{
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
private readonly ILogger<RedundancyCoordinator> _logger;
private readonly string _selfNodeId;
private readonly string _selfClusterId;
private RedundancyTopology? _current;
private bool _topologyValid = true;
public RedundancyCoordinator(
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
ILogger<RedundancyCoordinator> logger,
string selfNodeId,
string selfClusterId)
{
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
ArgumentException.ThrowIfNullOrWhiteSpace(selfClusterId);
_dbContextFactory = dbContextFactory;
_logger = logger;
_selfNodeId = selfNodeId;
_selfClusterId = selfClusterId;
}
/// <summary>Last-loaded topology; null before <see cref="InitializeAsync"/> completes.</summary>
public RedundancyTopology? Current => Volatile.Read(ref _current);
/// <summary>
/// True when the last load/refresh completed without an invariant violation; false
/// forces <see cref="ServiceLevelCalculator"/> into the <see cref="ServiceLevelBand.InvalidTopology"/>
/// band regardless of other inputs.
/// </summary>
public bool IsTopologyValid => Volatile.Read(ref _topologyValid);
/// <summary>Load the topology for the first time. Throws on invariant violation.</summary>
public async Task InitializeAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: true, ct).ConfigureAwait(false);
}
/// <summary>
/// Re-read the topology from the shared DB. Called after <c>sp_PublishGeneration</c>
/// completes or after an Admin-triggered role-swap. Never throws — on invariant
/// violation it logs + flips <see cref="IsTopologyValid"/> false so the calculator
/// returns <see cref="ServiceLevelBand.InvalidTopology"/> = 2.
/// </summary>
public async Task RefreshAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: false, ct).ConfigureAwait(false);
}
private async Task RefreshInternalAsync(bool throwOnInvalid, CancellationToken ct)
{
await using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false);
var cluster = await db.ServerClusters.AsNoTracking()
.FirstOrDefaultAsync(c => c.ClusterId == _selfClusterId, ct).ConfigureAwait(false)
?? throw new InvalidTopologyException($"Cluster '{_selfClusterId}' not found in config DB.");
var nodes = await db.ClusterNodes.AsNoTracking()
.Where(n => n.ClusterId == _selfClusterId && n.Enabled)
.ToListAsync(ct).ConfigureAwait(false);
try
{
var topology = ClusterTopologyLoader.Load(_selfNodeId, cluster, nodes);
Volatile.Write(ref _current, topology);
Volatile.Write(ref _topologyValid, true);
_logger.LogInformation(
"Redundancy topology loaded: cluster={Cluster} self={Self} role={Role} mode={Mode} peers={PeerCount}",
topology.ClusterId, topology.SelfNodeId, topology.SelfRole, topology.Mode, topology.PeerCount);
}
catch (InvalidTopologyException ex)
{
Volatile.Write(ref _topologyValid, false);
_logger.LogError(ex,
"Redundancy topology invariant violation for cluster {Cluster}: {Reason}",
_selfClusterId, ex.Message);
if (throwOnInvalid) throw;
}
}
}

View File

@@ -0,0 +1,55 @@
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Snapshot of the cluster topology the <see cref="RedundancyCoordinator"/> holds. Read
/// once at startup + refreshed on publish-generation notification. Immutable — every
/// refresh produces a new instance so observers can compare identity-equality to detect
/// topology change.
/// </summary>
/// <remarks>
/// Per Phase 6.3 Stream A.1. Invariants enforced by the loader (see
/// <see cref="ClusterTopologyLoader"/>): at most one Primary per cluster for
/// WarmActive/Hot redundancy modes; every node has a unique ApplicationUri (OPC UA
/// Part 4 requirement — clients pin trust here); at most 2 nodes total per cluster
/// (decision #83).
/// </remarks>
public sealed record RedundancyTopology(
string ClusterId,
string SelfNodeId,
RedundancyRole SelfRole,
RedundancyMode Mode,
IReadOnlyList<RedundancyPeer> Peers,
string SelfApplicationUri)
{
/// <summary>Peer count — 0 for a standalone (single-node) cluster, 1 for v2 two-node clusters.</summary>
public int PeerCount => Peers.Count;
/// <summary>
/// ServerUriArray shape per OPC UA Part 4 §6.6.2.2 — self first, peers in stable
/// deterministic order (lexicographic by NodeId), self's ApplicationUri always at index 0.
/// </summary>
public IReadOnlyList<string> ServerUriArray() =>
new[] { SelfApplicationUri }
.Concat(Peers.OrderBy(p => p.NodeId, StringComparer.OrdinalIgnoreCase).Select(p => p.ApplicationUri))
.ToList();
}
/// <summary>One peer in the cluster (every node other than self).</summary>
/// <param name="NodeId">Peer's stable logical NodeId (e.g. <c>"LINE3-OPCUA-B"</c>).</param>
/// <param name="Role">Peer's declared redundancy role from the shared config DB.</param>
/// <param name="Host">Peer's hostname / IP — drives the health-probe target.</param>
/// <param name="OpcUaPort">Peer's OPC UA endpoint port.</param>
/// <param name="DashboardPort">Peer's dashboard / health-endpoint port.</param>
/// <param name="ApplicationUri">Peer's declared ApplicationUri (carried in <see cref="RedundancyTopology.ServerUriArray"/>).</param>
public sealed record RedundancyPeer(
string NodeId,
RedundancyRole Role,
string Host,
int OpcUaPort,
int DashboardPort,
string ApplicationUri);
/// <summary>Thrown when the loader detects a topology-invariant violation at startup or refresh.</summary>
public sealed class InvalidTopologyException(string message) : Exception(message);

View File

@@ -0,0 +1,163 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
[Trait("Category", "Unit")]
public sealed class ClusterTopologyLoaderTests
{
private static ServerCluster Cluster(RedundancyMode mode = RedundancyMode.Warm) => new()
{
ClusterId = "c1",
Name = "Warsaw-West",
Enterprise = "zb",
Site = "warsaw-west",
RedundancyMode = mode,
CreatedBy = "test",
};
private static ClusterNode Node(string id, RedundancyRole role, string host, int port = 4840, string? appUri = null) => new()
{
NodeId = id,
ClusterId = "c1",
RedundancyRole = role,
Host = host,
OpcUaPort = port,
ApplicationUri = appUri ?? $"urn:{host}:OtOpcUa",
CreatedBy = "test",
};
[Fact]
public void SingleNode_Standalone_Loads()
{
var cluster = Cluster(RedundancyMode.None);
var nodes = new[] { Node("A", RedundancyRole.Standalone, "hostA") };
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
topology.SelfNodeId.ShouldBe("A");
topology.SelfRole.ShouldBe(RedundancyRole.Standalone);
topology.Peers.ShouldBeEmpty();
topology.SelfApplicationUri.ShouldBe("urn:hostA:OtOpcUa");
}
[Fact]
public void TwoNode_Cluster_LoadsSelfAndPeer()
{
var cluster = Cluster();
var nodes = new[]
{
Node("A", RedundancyRole.Primary, "hostA"),
Node("B", RedundancyRole.Secondary, "hostB"),
};
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
topology.SelfNodeId.ShouldBe("A");
topology.SelfRole.ShouldBe(RedundancyRole.Primary);
topology.Peers.Count.ShouldBe(1);
topology.Peers[0].NodeId.ShouldBe("B");
topology.Peers[0].Role.ShouldBe(RedundancyRole.Secondary);
}
[Fact]
public void ServerUriArray_Puts_Self_First_Peers_SortedLexicographically()
{
var cluster = Cluster();
var nodes = new[]
{
Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:A"),
Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:B"),
};
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
topology.ServerUriArray().ShouldBe(["urn:A", "urn:B"]);
}
[Fact]
public void EmptyNodes_Throws()
{
Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A", Cluster(), []));
}
[Fact]
public void SelfNotInCluster_Throws()
{
var nodes = new[] { Node("B", RedundancyRole.Primary, "hostB") };
Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A-missing", Cluster(), nodes));
}
[Fact]
public void ThreeNodeCluster_Rejected_Per_Decision83()
{
var nodes = new[]
{
Node("A", RedundancyRole.Primary, "hostA"),
Node("B", RedundancyRole.Secondary, "hostB"),
Node("C", RedundancyRole.Secondary, "hostC"),
};
var ex = Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
ex.Message.ShouldContain("decision #83");
}
[Fact]
public void DuplicateApplicationUri_Rejected()
{
var nodes = new[]
{
Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:shared"),
Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:shared"),
};
var ex = Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
ex.Message.ShouldContain("ApplicationUri");
}
[Fact]
public void TwoPrimaries_InWarmMode_Rejected()
{
var nodes = new[]
{
Node("A", RedundancyRole.Primary, "hostA"),
Node("B", RedundancyRole.Primary, "hostB"),
};
var ex = Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A", Cluster(RedundancyMode.Warm), nodes));
ex.Message.ShouldContain("2 Primary");
}
[Fact]
public void CrossCluster_Node_Rejected()
{
var foreign = Node("B", RedundancyRole.Secondary, "hostB");
foreign.ClusterId = "c-other";
var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA"), foreign };
Should.Throw<InvalidTopologyException>(
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
}
[Fact]
public void None_Mode_Allows_Any_Role_Mix()
{
// Standalone clusters don't enforce Primary-count; operator can pick anything.
var cluster = Cluster(RedundancyMode.None);
var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA") };
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
topology.Mode.ShouldBe(RedundancyMode.None);
}
}