using Akka.Actor;
using Akka.Cluster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Cluster;
///
/// Thread-safe live view of cluster membership and role topology. Subscribes to
/// , , and
/// through an internal subscriber actor and keeps
/// a snapshot of role-to-members + role-to-leader. The CLR-facing event surface is
/// .
///
public sealed class ClusterRoleInfo : IClusterRoleInfo, IDisposable
{
private readonly Akka.Cluster.Cluster _cluster;
private readonly ILogger _logger;
private readonly CommonsNodeId _localNode;
private readonly HashSet _localRoles;
private readonly object _lock = new();
private readonly Dictionary _roleLeaders = new(StringComparer.Ordinal);
private readonly Dictionary> _membersByRole = new(StringComparer.Ordinal);
private IActorRef? _subscriber;
public ClusterRoleInfo(ActorSystem system, IOptions options, ILogger logger)
{
_cluster = Akka.Cluster.Cluster.Get(system);
_logger = logger;
// NodeId encodes host:port so cluster members on shared hosts (test loopback, dev VMs
// sharing a bind IP) stay distinct. Production hosts have unique DNS names so the port
// suffix is harmless redundancy.
_localNode = CommonsNodeId.Parse($"{options.Value.PublicHostname}:{options.Value.Port}");
_localRoles = new HashSet(options.Value.Roles, StringComparer.Ordinal);
SeedFromCurrentState();
_subscriber = system.ActorOf(Props.Create(() => new SubscriberActor(this)), "clusterroleinfo-subscriber");
}
public CommonsNodeId LocalNode => _localNode;
public IReadOnlySet LocalRoles => _localRoles;
public bool HasRole(string role) => _localRoles.Contains(role);
public IReadOnlyList MembersWithRole(string role)
{
lock (_lock)
{
if (!_membersByRole.TryGetValue(role, out var members)) return Array.Empty();
return members
.Select(m => ToNodeId(m.Address))
.ToArray();
}
}
public CommonsNodeId? RoleLeader(string role)
{
lock (_lock)
{
return _roleLeaders.TryGetValue(role, out var leader) && leader is not null
? ToNodeId(leader.Address)
: (CommonsNodeId?)null;
}
}
public event EventHandler? RoleLeaderChanged;
private void SeedFromCurrentState()
{
var snapshot = _cluster.State;
lock (_lock)
{
foreach (var member in snapshot.Members)
foreach (var role in member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet();
set.Add(member);
}
foreach (var role in snapshot.Members.SelectMany(m => m.Roles).Distinct())
{
var leaderAddr = _cluster.State.RoleLeader(role);
_roleLeaders[role] = leaderAddr is not null
? snapshot.Members.FirstOrDefault(m => m.Address == leaderAddr)
: null;
}
}
}
internal void HandleMemberEvent(ClusterEvent.IMemberEvent evt)
{
lock (_lock)
{
switch (evt)
{
case ClusterEvent.MemberUp up:
foreach (var role in up.Member.Roles)
{
if (!_membersByRole.TryGetValue(role, out var set))
_membersByRole[role] = set = new HashSet();
set.Add(up.Member);
}
break;
case ClusterEvent.MemberRemoved removed:
foreach (var role in removed.Member.Roles)
if (_membersByRole.TryGetValue(role, out var set))
set.Remove(removed.Member);
break;
}
}
}
internal void HandleRoleLeaderChanged(ClusterEvent.RoleLeaderChanged evt)
{
CommonsNodeId? previous = null;
CommonsNodeId? next = null;
var raise = false;
lock (_lock)
{
_roleLeaders.TryGetValue(evt.Role, out var prevMember);
if (prevMember is not null)
previous = ToNodeId(prevMember.Address);
var nextMember = evt.Leader is null
? null
: _cluster.State.Members.FirstOrDefault(m => m.Address == evt.Leader);
_roleLeaders[evt.Role] = nextMember;
if (nextMember is not null)
next = ToNodeId(nextMember.Address);
raise = !Nullable.Equals(previous, next);
}
if (!raise) return;
try
{
RoleLeaderChanged?.Invoke(this, new RoleLeaderChangedEventArgs
{
Role = evt.Role,
PreviousLeader = previous,
NewLeader = next,
});
}
catch (Exception ex)
{
_logger.LogWarning(ex, "RoleLeaderChanged subscriber threw for role {Role}", evt.Role);
}
}
private static CommonsNodeId ToNodeId(Akka.Actor.Address address) =>
CommonsNodeId.Parse($"{address.Host ?? string.Empty}:{address.Port ?? 0}");
public void Dispose()
{
_subscriber?.Tell(PoisonPill.Instance);
_subscriber = null;
}
private sealed class SubscriberActor : ReceiveActor
{
public SubscriberActor(ClusterRoleInfo owner)
{
Receive(e => owner.HandleMemberEvent(e));
Receive(e => owner.HandleRoleLeaderChanged(e));
Receive(_ => { /* no-op for now; reserved for ServiceLevel calc */ });
Receive(_ => { /* seeded from initial snapshot */ });
}
protected override void PreStart()
{
Akka.Cluster.Cluster.Get(Context.System).Subscribe(
Self,
ClusterEvent.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent),
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged));
}
protected override void PostStop() =>
Akka.Cluster.Cluster.Get(Context.System).Unsubscribe(Self);
}
}