using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
///
/// Process-singleton holder of the current . Reads the
/// shared config DB at time + re-reads on
/// (called after sp_PublishGeneration completes so
/// operator role-swaps take effect without a process restart).
///
///
/// Per Phase 6.3 Stream A.1-A.2. The coordinator is the source of truth for the
/// inputs: role (from topology), peer reachability
/// (from peer-probe loops — Stream B.1/B.2 follow-up), apply-in-progress (from
/// ), topology-valid (from invariant checks at load time
/// + runtime detection of conflicting peer claims).
///
/// Topology refresh is CAS-style: a new instance
/// replaces the old one atomically via . Readers
/// always see a coherent snapshot — never a partial transition.
///
public sealed class RedundancyCoordinator
{
private readonly IDbContextFactory _dbContextFactory;
private readonly ILogger _logger;
private readonly string _selfNodeId;
private readonly string _selfClusterId;
private RedundancyTopology? _current;
private bool _topologyValid = true;
public RedundancyCoordinator(
IDbContextFactory dbContextFactory,
ILogger logger,
string selfNodeId,
string selfClusterId)
{
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
ArgumentException.ThrowIfNullOrWhiteSpace(selfClusterId);
_dbContextFactory = dbContextFactory;
_logger = logger;
_selfNodeId = selfNodeId;
_selfClusterId = selfClusterId;
}
/// Last-loaded topology; null before completes.
public RedundancyTopology? Current => Volatile.Read(ref _current);
///
/// True when the last load/refresh completed without an invariant violation; false
/// forces into the
/// band regardless of other inputs.
///
public bool IsTopologyValid => Volatile.Read(ref _topologyValid);
/// Load the topology for the first time. Throws on invariant violation.
public async Task InitializeAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: true, ct).ConfigureAwait(false);
}
///
/// Re-read the topology from the shared DB. Called after sp_PublishGeneration
/// completes or after an Admin-triggered role-swap. Never throws — on invariant
/// violation it logs + flips false so the calculator
/// returns = 2.
///
public async Task RefreshAsync(CancellationToken ct)
{
await RefreshInternalAsync(throwOnInvalid: false, ct).ConfigureAwait(false);
}
private async Task RefreshInternalAsync(bool throwOnInvalid, CancellationToken ct)
{
await using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false);
var cluster = await db.ServerClusters.AsNoTracking()
.FirstOrDefaultAsync(c => c.ClusterId == _selfClusterId, ct).ConfigureAwait(false)
?? throw new InvalidTopologyException($"Cluster '{_selfClusterId}' not found in config DB.");
var nodes = await db.ClusterNodes.AsNoTracking()
.Where(n => n.ClusterId == _selfClusterId && n.Enabled)
.ToListAsync(ct).ConfigureAwait(false);
try
{
var topology = ClusterTopologyLoader.Load(_selfNodeId, cluster, nodes);
Volatile.Write(ref _current, topology);
Volatile.Write(ref _topologyValid, true);
_logger.LogInformation(
"Redundancy topology loaded: cluster={Cluster} self={Self} role={Role} mode={Mode} peers={PeerCount}",
topology.ClusterId, topology.SelfNodeId, topology.SelfRole, topology.Mode, topology.PeerCount);
}
catch (InvalidTopologyException ex)
{
Volatile.Write(ref _topologyValid, false);
_logger.LogError(ex,
"Redundancy topology invariant violation for cluster {Cluster}: {Reason}",
_selfClusterId, ex.Message);
if (throwOnInvalid) throw;
}
}
}