diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/ClusterTopologyLoader.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/ClusterTopologyLoader.cs new file mode 100644 index 0000000..c114064 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/ClusterTopologyLoader.cs @@ -0,0 +1,96 @@ +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +/// +/// Pure-function mapper from the shared config DB's + +/// rows to an immutable . +/// Validates Phase 6.3 Stream A.1 invariants and throws +/// on violation so the coordinator can fail startup +/// fast with a clear message rather than boot into an ambiguous state. +/// +/// +/// Stateless — the caller owns the DB round-trip + hands rows in. Keeping it pure makes +/// the invariant matrix testable without EF or SQL Server. +/// +public static class ClusterTopologyLoader +{ + /// Build a topology snapshot for the given self node. Throws on invariant violation. + public static RedundancyTopology Load(string selfNodeId, ServerCluster cluster, IReadOnlyList nodes) + { + ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId); + ArgumentNullException.ThrowIfNull(cluster); + ArgumentNullException.ThrowIfNull(nodes); + + ValidateClusterShape(cluster, nodes); + ValidateUniqueApplicationUris(nodes); + ValidatePrimaryCount(cluster, nodes); + + var self = nodes.FirstOrDefault(n => string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase)) + ?? throw new InvalidTopologyException( + $"Self node '{selfNodeId}' is not a member of cluster '{cluster.ClusterId}'. " + + $"Members: {string.Join(", ", nodes.Select(n => n.NodeId))}."); + + var peers = nodes + .Where(n => !string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase)) + .Select(n => new RedundancyPeer( + NodeId: n.NodeId, + Role: n.RedundancyRole, + Host: n.Host, + OpcUaPort: n.OpcUaPort, + DashboardPort: n.DashboardPort, + ApplicationUri: n.ApplicationUri)) + .ToList(); + + return new RedundancyTopology( + ClusterId: cluster.ClusterId, + SelfNodeId: self.NodeId, + SelfRole: self.RedundancyRole, + Mode: cluster.RedundancyMode, + Peers: peers, + SelfApplicationUri: self.ApplicationUri); + } + + private static void ValidateClusterShape(ServerCluster cluster, IReadOnlyList nodes) + { + if (nodes.Count == 0) + throw new InvalidTopologyException($"Cluster '{cluster.ClusterId}' has zero nodes."); + + // Decision #83 — v2.0 caps clusters at two nodes. + if (nodes.Count > 2) + throw new InvalidTopologyException( + $"Cluster '{cluster.ClusterId}' has {nodes.Count} nodes. v2.0 supports at most 2 nodes per cluster (decision #83)."); + + // Every node must belong to the given cluster. + var wrongCluster = nodes.FirstOrDefault(n => + !string.Equals(n.ClusterId, cluster.ClusterId, StringComparison.OrdinalIgnoreCase)); + if (wrongCluster is not null) + throw new InvalidTopologyException( + $"Node '{wrongCluster.NodeId}' belongs to cluster '{wrongCluster.ClusterId}', not '{cluster.ClusterId}'."); + } + + private static void ValidateUniqueApplicationUris(IReadOnlyList nodes) + { + var dup = nodes + .GroupBy(n => n.ApplicationUri, StringComparer.Ordinal) + .FirstOrDefault(g => g.Count() > 1); + if (dup is not null) + throw new InvalidTopologyException( + $"Nodes {string.Join(", ", dup.Select(n => n.NodeId))} share ApplicationUri '{dup.Key}'. " + + $"OPC UA Part 4 requires unique ApplicationUri per server — clients pin trust here (decision #86)."); + } + + private static void ValidatePrimaryCount(ServerCluster cluster, IReadOnlyList nodes) + { + // Standalone mode: any role is fine. Warm / Hot: at most one Primary per cluster. + if (cluster.RedundancyMode == RedundancyMode.None) return; + + var primaries = nodes.Count(n => n.RedundancyRole == RedundancyRole.Primary); + if (primaries > 1) + throw new InvalidTopologyException( + $"Cluster '{cluster.ClusterId}' has {primaries} Primary nodes in redundancy mode {cluster.RedundancyMode}. " + + $"At most one Primary per cluster (decision #84). Runtime detects and demotes both to ServiceLevel 2 " + + $"per the 8-state matrix; startup fails fast to surface the misconfiguration earlier."); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs new file mode 100644 index 0000000..75c8fd7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs @@ -0,0 +1,107 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +/// +/// Process-singleton holder of the current . Reads the +/// shared config DB at time + re-reads on +/// (called after sp_PublishGeneration completes so +/// operator role-swaps take effect without a process restart). +/// +/// +/// Per Phase 6.3 Stream A.1-A.2. The coordinator is the source of truth for the +/// inputs: role (from topology), peer reachability +/// (from peer-probe loops — Stream B.1/B.2 follow-up), apply-in-progress (from +/// ), topology-valid (from invariant checks at load time +/// + runtime detection of conflicting peer claims). +/// +/// Topology refresh is CAS-style: a new instance +/// replaces the old one atomically via . Readers +/// always see a coherent snapshot — never a partial transition. +/// +public sealed class RedundancyCoordinator +{ + private readonly IDbContextFactory _dbContextFactory; + private readonly ILogger _logger; + private readonly string _selfNodeId; + private readonly string _selfClusterId; + private RedundancyTopology? _current; + private bool _topologyValid = true; + + public RedundancyCoordinator( + IDbContextFactory dbContextFactory, + ILogger logger, + string selfNodeId, + string selfClusterId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId); + ArgumentException.ThrowIfNullOrWhiteSpace(selfClusterId); + + _dbContextFactory = dbContextFactory; + _logger = logger; + _selfNodeId = selfNodeId; + _selfClusterId = selfClusterId; + } + + /// Last-loaded topology; null before completes. + public RedundancyTopology? Current => Volatile.Read(ref _current); + + /// + /// True when the last load/refresh completed without an invariant violation; false + /// forces into the + /// band regardless of other inputs. + /// + public bool IsTopologyValid => Volatile.Read(ref _topologyValid); + + /// Load the topology for the first time. Throws on invariant violation. + public async Task InitializeAsync(CancellationToken ct) + { + await RefreshInternalAsync(throwOnInvalid: true, ct).ConfigureAwait(false); + } + + /// + /// Re-read the topology from the shared DB. Called after sp_PublishGeneration + /// completes or after an Admin-triggered role-swap. Never throws — on invariant + /// violation it logs + flips false so the calculator + /// returns = 2. + /// + public async Task RefreshAsync(CancellationToken ct) + { + await RefreshInternalAsync(throwOnInvalid: false, ct).ConfigureAwait(false); + } + + private async Task RefreshInternalAsync(bool throwOnInvalid, CancellationToken ct) + { + await using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false); + + var cluster = await db.ServerClusters.AsNoTracking() + .FirstOrDefaultAsync(c => c.ClusterId == _selfClusterId, ct).ConfigureAwait(false) + ?? throw new InvalidTopologyException($"Cluster '{_selfClusterId}' not found in config DB."); + + var nodes = await db.ClusterNodes.AsNoTracking() + .Where(n => n.ClusterId == _selfClusterId && n.Enabled) + .ToListAsync(ct).ConfigureAwait(false); + + try + { + var topology = ClusterTopologyLoader.Load(_selfNodeId, cluster, nodes); + Volatile.Write(ref _current, topology); + Volatile.Write(ref _topologyValid, true); + _logger.LogInformation( + "Redundancy topology loaded: cluster={Cluster} self={Self} role={Role} mode={Mode} peers={PeerCount}", + topology.ClusterId, topology.SelfNodeId, topology.SelfRole, topology.Mode, topology.PeerCount); + } + catch (InvalidTopologyException ex) + { + Volatile.Write(ref _topologyValid, false); + _logger.LogError(ex, + "Redundancy topology invariant violation for cluster {Cluster}: {Reason}", + _selfClusterId, ex.Message); + if (throwOnInvalid) throw; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyTopology.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyTopology.cs new file mode 100644 index 0000000..e3ef766 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyTopology.cs @@ -0,0 +1,55 @@ +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +/// +/// Snapshot of the cluster topology the holds. Read +/// once at startup + refreshed on publish-generation notification. Immutable — every +/// refresh produces a new instance so observers can compare identity-equality to detect +/// topology change. +/// +/// +/// Per Phase 6.3 Stream A.1. Invariants enforced by the loader (see +/// ): at most one Primary per cluster for +/// WarmActive/Hot redundancy modes; every node has a unique ApplicationUri (OPC UA +/// Part 4 requirement — clients pin trust here); at most 2 nodes total per cluster +/// (decision #83). +/// +public sealed record RedundancyTopology( + string ClusterId, + string SelfNodeId, + RedundancyRole SelfRole, + RedundancyMode Mode, + IReadOnlyList Peers, + string SelfApplicationUri) +{ + /// Peer count — 0 for a standalone (single-node) cluster, 1 for v2 two-node clusters. + public int PeerCount => Peers.Count; + + /// + /// ServerUriArray shape per OPC UA Part 4 §6.6.2.2 — self first, peers in stable + /// deterministic order (lexicographic by NodeId), self's ApplicationUri always at index 0. + /// + public IReadOnlyList ServerUriArray() => + new[] { SelfApplicationUri } + .Concat(Peers.OrderBy(p => p.NodeId, StringComparer.OrdinalIgnoreCase).Select(p => p.ApplicationUri)) + .ToList(); +} + +/// One peer in the cluster (every node other than self). +/// Peer's stable logical NodeId (e.g. "LINE3-OPCUA-B"). +/// Peer's declared redundancy role from the shared config DB. +/// Peer's hostname / IP — drives the health-probe target. +/// Peer's OPC UA endpoint port. +/// Peer's dashboard / health-endpoint port. +/// Peer's declared ApplicationUri (carried in ). +public sealed record RedundancyPeer( + string NodeId, + RedundancyRole Role, + string Host, + int OpcUaPort, + int DashboardPort, + string ApplicationUri); + +/// Thrown when the loader detects a topology-invariant violation at startup or refresh. +public sealed class InvalidTopologyException(string message) : Exception(message); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ClusterTopologyLoaderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ClusterTopologyLoaderTests.cs new file mode 100644 index 0000000..d9202d3 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ClusterTopologyLoaderTests.cs @@ -0,0 +1,163 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +[Trait("Category", "Unit")] +public sealed class ClusterTopologyLoaderTests +{ + private static ServerCluster Cluster(RedundancyMode mode = RedundancyMode.Warm) => new() + { + ClusterId = "c1", + Name = "Warsaw-West", + Enterprise = "zb", + Site = "warsaw-west", + RedundancyMode = mode, + CreatedBy = "test", + }; + + private static ClusterNode Node(string id, RedundancyRole role, string host, int port = 4840, string? appUri = null) => new() + { + NodeId = id, + ClusterId = "c1", + RedundancyRole = role, + Host = host, + OpcUaPort = port, + ApplicationUri = appUri ?? $"urn:{host}:OtOpcUa", + CreatedBy = "test", + }; + + [Fact] + public void SingleNode_Standalone_Loads() + { + var cluster = Cluster(RedundancyMode.None); + var nodes = new[] { Node("A", RedundancyRole.Standalone, "hostA") }; + + var topology = ClusterTopologyLoader.Load("A", cluster, nodes); + + topology.SelfNodeId.ShouldBe("A"); + topology.SelfRole.ShouldBe(RedundancyRole.Standalone); + topology.Peers.ShouldBeEmpty(); + topology.SelfApplicationUri.ShouldBe("urn:hostA:OtOpcUa"); + } + + [Fact] + public void TwoNode_Cluster_LoadsSelfAndPeer() + { + var cluster = Cluster(); + var nodes = new[] + { + Node("A", RedundancyRole.Primary, "hostA"), + Node("B", RedundancyRole.Secondary, "hostB"), + }; + + var topology = ClusterTopologyLoader.Load("A", cluster, nodes); + + topology.SelfNodeId.ShouldBe("A"); + topology.SelfRole.ShouldBe(RedundancyRole.Primary); + topology.Peers.Count.ShouldBe(1); + topology.Peers[0].NodeId.ShouldBe("B"); + topology.Peers[0].Role.ShouldBe(RedundancyRole.Secondary); + } + + [Fact] + public void ServerUriArray_Puts_Self_First_Peers_SortedLexicographically() + { + var cluster = Cluster(); + var nodes = new[] + { + Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:A"), + Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:B"), + }; + + var topology = ClusterTopologyLoader.Load("A", cluster, nodes); + + topology.ServerUriArray().ShouldBe(["urn:A", "urn:B"]); + } + + [Fact] + public void EmptyNodes_Throws() + { + Should.Throw( + () => ClusterTopologyLoader.Load("A", Cluster(), [])); + } + + [Fact] + public void SelfNotInCluster_Throws() + { + var nodes = new[] { Node("B", RedundancyRole.Primary, "hostB") }; + + Should.Throw( + () => ClusterTopologyLoader.Load("A-missing", Cluster(), nodes)); + } + + [Fact] + public void ThreeNodeCluster_Rejected_Per_Decision83() + { + var nodes = new[] + { + Node("A", RedundancyRole.Primary, "hostA"), + Node("B", RedundancyRole.Secondary, "hostB"), + Node("C", RedundancyRole.Secondary, "hostC"), + }; + + var ex = Should.Throw( + () => ClusterTopologyLoader.Load("A", Cluster(), nodes)); + ex.Message.ShouldContain("decision #83"); + } + + [Fact] + public void DuplicateApplicationUri_Rejected() + { + var nodes = new[] + { + Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:shared"), + Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:shared"), + }; + + var ex = Should.Throw( + () => ClusterTopologyLoader.Load("A", Cluster(), nodes)); + ex.Message.ShouldContain("ApplicationUri"); + } + + [Fact] + public void TwoPrimaries_InWarmMode_Rejected() + { + var nodes = new[] + { + Node("A", RedundancyRole.Primary, "hostA"), + Node("B", RedundancyRole.Primary, "hostB"), + }; + + var ex = Should.Throw( + () => ClusterTopologyLoader.Load("A", Cluster(RedundancyMode.Warm), nodes)); + ex.Message.ShouldContain("2 Primary"); + } + + [Fact] + public void CrossCluster_Node_Rejected() + { + var foreign = Node("B", RedundancyRole.Secondary, "hostB"); + foreign.ClusterId = "c-other"; + + var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA"), foreign }; + + Should.Throw( + () => ClusterTopologyLoader.Load("A", Cluster(), nodes)); + } + + [Fact] + public void None_Mode_Allows_Any_Role_Mix() + { + // Standalone clusters don't enforce Primary-count; operator can pick anything. + var cluster = Cluster(RedundancyMode.None); + var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA") }; + + var topology = ClusterTopologyLoader.Load("A", cluster, nodes); + + topology.Mode.ShouldBe(RedundancyMode.None); + } +}