Files
lmxopcua/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs
T
Joseph Doherty 64e3fbe035
v2-ci / build (push) Failing after 1m43s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
docs: backfill XML documentation across 756 files
Adds <summary>, <param>, <typeparam>, and <inheritdoc/> tags to public
members surfaced by commentchecker — resolves 5,847 of 5,869 issues
(99.6%) across three /fixdocs passes.
2026-05-28 08:10:17 -04:00

214 lines
8.5 KiB
C#

using Akka.Actor;
using Akka.Cluster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
/// <summary>
/// Thread-safe live view of cluster membership and role topology. Subscribes to
/// <see cref="ClusterEvent.IMemberEvent"/>, <see cref="ClusterEvent.RoleLeaderChanged"/>, and
/// <see cref="ClusterEvent.LeaderChanged"/> through an internal subscriber actor and keeps
/// a snapshot of role-to-members + role-to-leader. The CLR-facing event surface is
/// <see cref="IClusterRoleInfo.RoleLeaderChanged"/>.
/// </summary>
public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{
private readonly Akka.Cluster.Cluster _cluster;
private readonly ILogger<ClusterRoleInfo> _logger;
private readonly CommonsNodeId _localNode;
private readonly HashSet<string> _localRoles;
private readonly object _lock = new();
private readonly Dictionary<string, Member?> _roleLeaders = new(StringComparer.Ordinal);
private readonly Dictionary<string, HashSet<Member>> _membersByRole = new(StringComparer.Ordinal);
private IActorRef? _subscriber;
/// <summary>Initializes a new instance of the ClusterRoleInfo class.</summary>
/// <param name="system">The Akka actor system.</param>
/// <param name="options">The cluster configuration options.</param>
/// <param name="logger">The logger instance.</param>
public ClusterRoleInfo(ActorSystem system, IOptions<AkkaClusterOptions> options, ILogger<ClusterRoleInfo> logger)
{
_cluster = Akka.Cluster.Cluster.Get(system);
_logger = logger;
// NodeId encodes host:port so cluster members on shared hosts (test loopback, dev VMs
// sharing a bind IP) stay distinct. Production hosts have unique DNS names so the port
// suffix is harmless redundancy.
_localNode = CommonsNodeId.Parse($"{options.Value.PublicHostname}:{options.Value.Port}");
_localRoles = new HashSet<string>(options.Value.Roles, StringComparer.Ordinal);
SeedFromCurrentState();
_subscriber = system.ActorOf(Props.Create(() => new SubscriberActor(this)), "clusterroleinfo-subscriber");
}
/// <summary>Gets the local cluster node identifier.</summary>
public CommonsNodeId LocalNode => _localNode;
/// <summary>Gets the set of roles assigned to the local node.</summary>
public IReadOnlySet<string> LocalRoles => _localRoles;
/// <summary>Checks if the local node has a specific role.</summary>
/// <param name="role">The role name to check.</param>
/// <returns>True if the local node has the specified role; otherwise false.</returns>
public bool HasRole(string role) => _localRoles.Contains(role);
/// <summary>Gets all cluster members that have a specific role.</summary>
/// <param name="role">The role name.</param>
/// <returns>A read-only list of node IDs with the specified role.</returns>
public IReadOnlyList<CommonsNodeId> MembersWithRole(string role)
{
lock (_lock)
{
if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty<CommonsNodeId>();
return members
.Select(m => ToNodeId(m.Address))
.ToArray();
}
}
/// <summary>Gets the current leader node for a specific role.</summary>
/// <param name="role">The role name.</param>
/// <returns>The node ID of the current role leader, or null if no leader is elected.</returns>
public CommonsNodeId? RoleLeader(string role)
{
lock (_lock)
{
return _roleLeaders.TryGetValue(role, out var leader) && leader is not null
? ToNodeId(leader.Address)
: (CommonsNodeId?)null;
}
}
/// <summary>Occurs when the leader for a role changes.</summary>
public event EventHandler<RoleLeaderChangedEventArgs>? RoleLeaderChanged;
private void SeedFromCurrentState()
{
var snapshot = _cluster.State;
lock (_lock)
{
foreach (var member in snapshot.Members)
foreach (var role in member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet<Member>();
set.Add(member);
}
foreach (var role in snapshot.Members.SelectMany(m => m.Roles).Distinct())
{
var leaderAddr = _cluster.State.RoleLeader(role);
_roleLeaders[role] = leaderAddr is not null
? snapshot.Members.FirstOrDefault(m => m.Address == leaderAddr)
: null;
}
}
}
/// <summary>Handles a cluster member event (member up/removed).</summary>
/// <param name="evt">The member event from the cluster.</param>
internal void HandleMemberEvent(ClusterEvent.IMemberEvent evt)
{
lock (_lock)
{
switch (evt)
{
case ClusterEvent.MemberUp up:
foreach (var role in up.Member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet<Member>();
set.Add(up.Member);
}
break;
case ClusterEvent.MemberRemoved removed:
foreach (var role in removed.Member.Roles)
if (_membersByRole.TryGetValue(role, out var set))
set.Remove(removed.Member);
break;
}
}
}
/// <summary>Handles a role leader change event.</summary>
/// <param name="evt">The role leader changed event from the cluster.</param>
internal void HandleRoleLeaderChanged(ClusterEvent.RoleLeaderChanged evt)
{
CommonsNodeId? previous = null;
CommonsNodeId? next = null;
var raise = false;
lock (_lock)
{
_roleLeaders.TryGetValue(evt.Role, out var prevMember);
if (prevMember is not null)
previous = ToNodeId(prevMember.Address);
var nextMember = evt.Leader is null
? null
: _cluster.State.Members.FirstOrDefault(m => m.Address == evt.Leader);
_roleLeaders[evt.Role] = nextMember;
if (nextMember is not null)
next = ToNodeId(nextMember.Address);
raise = !Nullable.Equals(previous, next);
}
if (!raise) return;
try
{
RoleLeaderChanged?.Invoke(this, new RoleLeaderChangedEventArgs
{
Role = evt.Role,
PreviousLeader = previous,
NewLeader = next,
});
}
catch (Exception ex)
{
_logger.LogWarning(ex, "RoleLeaderChanged subscriber threw for role {Role}", evt.Role);
}
}
private static CommonsNodeId ToNodeId(Akka.Actor.Address address) =>
CommonsNodeId.Parse($"{address.Host ?? string.Empty}:{address.Port ?? 0}");
/// <summary>Disposes the ClusterRoleInfo and stops the subscriber actor.</summary>
public void Dispose()
{
_subscriber?.Tell(PoisonPill.Instance);
_subscriber = null;
}
private sealed class SubscriberActor : ReceiveActor
{
/// <summary>Initializes a new instance of the SubscriberActor class.</summary>
/// <param name="owner">The ClusterRoleInfo instance to forward events to.</param>
public SubscriberActor(ClusterRoleInfo owner)
{
Receive<ClusterEvent.IMemberEvent>(e => owner.HandleMemberEvent(e));
Receive<ClusterEvent.RoleLeaderChanged>(e => owner.HandleRoleLeaderChanged(e));
Receive<ClusterEvent.LeaderChanged>(_ => { /* no-op for now; reserved for ServiceLevel calc */ });
Receive<ClusterEvent.CurrentClusterState>(_ => { /* seeded from initial snapshot */ });
}
/// <inheritdoc />
protected override void PreStart()
{
Akka.Cluster.Cluster.Get(Context.System).Subscribe(
Self,
ClusterEvent.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent),
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged));
}
/// <inheritdoc />
protected override void PostStop() =>
Akka.Cluster.Cluster.Get(Context.System).Unsubscribe(Self);
}
}