diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs new file mode 100644 index 0000000..b344c9a --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ClusterRoleInfo.cs @@ -0,0 +1,182 @@ +using Akka.Actor; +using Akka.Cluster; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; + +namespace ZB.MOM.WW.OtOpcUa.Cluster; + +/// +/// Thread-safe live view of cluster membership and role topology. Subscribes to +/// , , and +/// through an internal subscriber actor and keeps +/// a snapshot of role-to-members + role-to-leader. The CLR-facing event surface is +/// . +/// +public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable +{ + private readonly Akka.Cluster.Cluster _cluster; + private readonly ILogger _logger; + private readonly CommonsNodeId _localNode; + private readonly HashSet _localRoles; + private readonly object _lock = new(); + private readonly Dictionary _roleLeaders = new(StringComparer.Ordinal); + private readonly Dictionary> _membersByRole = new(StringComparer.Ordinal); + private IActorRef? _subscriber; + + public ClusterRoleInfo(ActorSystem system, IOptions options, ILogger logger) + { + _cluster = Akka.Cluster.Cluster.Get(system); + _logger = logger; + _localNode = CommonsNodeId.Parse(options.Value.PublicHostname); + _localRoles = new HashSet(options.Value.Roles, StringComparer.Ordinal); + + SeedFromCurrentState(); + _subscriber = system.ActorOf(Props.Create(() => new SubscriberActor(this)), "clusterroleinfo-subscriber"); + } + + public CommonsNodeId LocalNode => _localNode; + + public IReadOnlySet LocalRoles => _localRoles; + + public bool HasRole(string role) => _localRoles.Contains(role); + + public IReadOnlyList MembersWithRole(string role) + { + lock (_lock) + { + if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty(); + return members + .Select(m => CommonsNodeId.Parse(m.Address.Host ?? string.Empty)) + .ToArray(); + } + } + + public CommonsNodeId? RoleLeader(string role) + { + lock (_lock) + { + return _roleLeaders.TryGetValue(role, out var leader) && leader is not null + ? CommonsNodeId.Parse(leader.Address.Host ?? string.Empty) + : (CommonsNodeId?)null; + } + } + + public event EventHandler? RoleLeaderChanged; + + private void SeedFromCurrentState() + { + var snapshot = _cluster.State; + lock (_lock) + { + foreach (var member in snapshot.Members) + foreach (var role in member.Roles) + { + if (!_membersByRole.TryGetValue(role, out var set)) + _membersByRole[role] = set = new HashSet(); + set.Add(member); + } + + foreach (var role in snapshot.Members.SelectMany(m => m.Roles).Distinct()) + { + var leaderAddr = _cluster.State.RoleLeader(role); + _roleLeaders[role] = leaderAddr is not null + ? snapshot.Members.FirstOrDefault(m => m.Address == leaderAddr) + : null; + } + } + } + + internal void HandleMemberEvent(ClusterEvent.IMemberEvent evt) + { + lock (_lock) + { + switch (evt) + { + case ClusterEvent.MemberUp up: + foreach (var role in up.Member.Roles) + { + if (!_membersByRole.TryGetValue(role, out var set)) + _membersByRole[role] = set = new HashSet(); + set.Add(up.Member); + } + break; + case ClusterEvent.MemberRemoved removed: + foreach (var role in removed.Member.Roles) + if (_membersByRole.TryGetValue(role, out var set)) + set.Remove(removed.Member); + break; + } + } + } + + internal void HandleRoleLeaderChanged(ClusterEvent.RoleLeaderChanged evt) + { + CommonsNodeId? previous = null; + CommonsNodeId? next = null; + var raise = false; + + lock (_lock) + { + _roleLeaders.TryGetValue(evt.Role, out var prevMember); + if (prevMember is not null) + previous = CommonsNodeId.Parse(prevMember.Address.Host ?? string.Empty); + + var nextMember = evt.Leader is null + ? null + : _cluster.State.Members.FirstOrDefault(m => m.Address == evt.Leader); + + _roleLeaders[evt.Role] = nextMember; + if (nextMember is not null) + next = CommonsNodeId.Parse(nextMember.Address.Host ?? string.Empty); + + raise = !Nullable.Equals(previous, next); + } + + if (!raise) return; + try + { + RoleLeaderChanged?.Invoke(this, new RoleLeaderChangedEventArgs + { + Role = evt.Role, + PreviousLeader = previous, + NewLeader = next, + }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "RoleLeaderChanged subscriber threw for role {Role}", evt.Role); + } + } + + public void Dispose() + { + _subscriber?.Tell(PoisonPill.Instance); + _subscriber = null; + } + + private sealed class SubscriberActor : ReceiveActor + { + public SubscriberActor(ClusterRoleInfo owner) + { + Receive(e => owner.HandleMemberEvent(e)); + Receive(e => owner.HandleRoleLeaderChanged(e)); + Receive(_ => { /* no-op for now; reserved for ServiceLevel calc */ }); + Receive(_ => { /* seeded from initial snapshot */ }); + } + + protected override void PreStart() + { + Akka.Cluster.Cluster.Get(Context.System).Subscribe( + Self, + ClusterEvent.InitialStateAsEvents, + typeof(ClusterEvent.IMemberEvent), + typeof(ClusterEvent.LeaderChanged), + typeof(ClusterEvent.RoleLeaderChanged)); + } + + protected override void PostStop() => + Akka.Cluster.Cluster.Get(Context.System).Unsubscribe(Self); + } +}