using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Host.Domain;
namespace ZB.MOM.WW.OtOpcUa.Host.MxAccess
{
///
/// Advises <ObjectName>.ScanState on every deployed $WinPlatform and
/// $AppEngine, tracks their runtime state (Unknown / Running / Stopped), and notifies
/// the owning node manager on Running↔Stopped transitions so it can proactively flip every
/// OPC UA variable hosted by that object to BadOutOfService (and clear on recovery).
///
///
/// State machine semantics are documented in runtimestatus.md. Key facts:
///
/// - ScanState is delivered on-change only — no periodic heartbeat. A stably
/// Running host may go hours without a callback.
/// - Running → Stopped is driven by explicit error callbacks or ScanState = false,
/// NEVER by starvation. The only starvation check applies to the initial Unknown state.
/// - When the MxAccess transport is disconnected, returns every
/// entry with regardless of the underlying state,
/// because we can't observe anything through a dead transport.
/// - The stop/start callbacks fire synchronously from whichever thread delivered the
/// probe update. The manager releases its own lock before invoking them to avoid
/// lock-inversion deadlocks with the node manager's Lock.
///
///
public sealed class GalaxyRuntimeProbeManager : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext();
private const int CategoryWinPlatform = 1;
private const int CategoryAppEngine = 3;
private const string KindWinPlatform = "$WinPlatform";
private const string KindAppEngine = "$AppEngine";
private const string ProbeAttribute = ".ScanState";
private readonly IMxAccessClient _client;
private readonly TimeSpan _unknownTimeout;
private readonly Action? _onHostStopped;
private readonly Action? _onHostRunning;
private readonly Func _clock;
// Key: probe tag reference (e.g. "DevAppEngine.ScanState").
// Value: the current runtime status for that host, kept in sync on every probe callback
// and queried via GetSnapshot for dashboard rendering.
private readonly Dictionary _byProbe =
new Dictionary(StringComparer.OrdinalIgnoreCase);
// Reverse index: gobject_id -> probe tag, so Sync() can diff new/removed hosts efficiently.
private readonly Dictionary _probeByGobjectId = new Dictionary();
private readonly object _lock = new object();
private bool _disposed;
///
/// Initializes a new probe manager. and
/// are invoked synchronously on Running↔Stopped
/// transitions so the owning node manager can invalidate / restore the hosted subtree.
///
public GalaxyRuntimeProbeManager(
IMxAccessClient client,
int unknownTimeoutSeconds,
Action? onHostStopped = null,
Action? onHostRunning = null)
: this(client, unknownTimeoutSeconds, onHostStopped, onHostRunning, () => DateTime.UtcNow)
{
}
internal GalaxyRuntimeProbeManager(
IMxAccessClient client,
int unknownTimeoutSeconds,
Action? onHostStopped,
Action? onHostRunning,
Func clock)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_unknownTimeout = TimeSpan.FromSeconds(Math.Max(1, unknownTimeoutSeconds));
_onHostStopped = onHostStopped;
_onHostRunning = onHostRunning;
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
}
///
/// Gets the number of active probe subscriptions. Surfaced on the dashboard Subscriptions
/// panel so operators can see bridge-owned probe count separately from the total.
///
public int ActiveProbeCount
{
get
{
lock (_lock)
return _byProbe.Count;
}
}
///
/// Returns when the galaxy runtime host identified by
/// is currently in the
/// state. Used by the node manager's Read path to short-circuit on-demand reads of tags
/// hosted by a known-stopped runtime object, preventing MxAccess from serving stale
/// cached values as Good. Unlike this check uses the
/// underlying state directly — transport-disconnected hosts will NOT report Stopped here
/// (they report their last-known state), because connection-loss is handled by the
/// normal MxAccess error paths and we don't want this method to double-flag.
///
public bool IsHostStopped(int gobjectId)
{
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var probe)
&& _byProbe.TryGetValue(probe, out var status))
{
return status.State == GalaxyRuntimeState.Stopped;
}
}
return false;
}
///
/// Returns a point-in-time clone of the runtime status for the host identified by
/// , or when no probe is registered
/// for that object. Used by the node manager to populate the synthetic $RuntimeState
/// child variables on each host object. Uses the underlying state directly (not the
/// transport-gated rewrite), matching .
///
public GalaxyRuntimeStatus? GetHostStatus(int gobjectId)
{
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var probe)
&& _byProbe.TryGetValue(probe, out var status))
{
return Clone(status, forceUnknown: false);
}
}
return null;
}
///
/// Diffs the supplied hierarchy against the active probe set, advising new hosts and
/// unadvising removed ones. The hierarchy is filtered to runtime host categories
/// ($WinPlatform, $AppEngine) — non-host rows are ignored. Idempotent: a second call
/// with the same hierarchy performs no Advise / Unadvise work.
///
///
/// Sync is synchronous on MxAccess: is
/// awaited for each new host, so for a galaxy with N runtime hosts the call blocks for
/// ~N round-trips. This is acceptable because it only runs during address-space build
/// and rebuild, not on the hot path.
///
public async Task SyncAsync(IReadOnlyList hierarchy)
{
if (_disposed || hierarchy == null)
return;
// Filter to runtime hosts and project to the expected probe tag name.
var desired = new Dictionary();
foreach (var obj in hierarchy)
{
if (obj.CategoryId != CategoryWinPlatform && obj.CategoryId != CategoryAppEngine)
continue;
if (string.IsNullOrWhiteSpace(obj.TagName))
continue;
var probe = obj.TagName + ProbeAttribute;
var kind = obj.CategoryId == CategoryWinPlatform ? KindWinPlatform : KindAppEngine;
desired[obj.GobjectId] = (probe, kind, obj);
}
// Compute diffs under lock, release lock before issuing SDK calls (which can block).
// toSubscribe carries the gobject id alongside the probe name so the rollback path on
// subscribe failure can unwind both dictionaries without a reverse lookup.
List<(int GobjectId, string Probe)> toSubscribe;
List toUnsubscribe;
lock (_lock)
{
toSubscribe = new List<(int, string)>();
toUnsubscribe = new List();
foreach (var kvp in desired)
{
if (_probeByGobjectId.TryGetValue(kvp.Key, out var existingProbe))
{
// Already tracked: ensure the status entry is aligned (tag rename path is
// intentionally not supported — if the probe changed, treat it as remove+add).
if (!string.Equals(existingProbe, kvp.Value.Probe, StringComparison.OrdinalIgnoreCase))
{
toUnsubscribe.Add(existingProbe);
_byProbe.Remove(existingProbe);
_probeByGobjectId.Remove(kvp.Key);
toSubscribe.Add((kvp.Key, kvp.Value.Probe));
_byProbe[kvp.Value.Probe] = MakeInitialStatus(kvp.Value.Obj, kvp.Value.Kind);
_probeByGobjectId[kvp.Key] = kvp.Value.Probe;
}
}
else
{
toSubscribe.Add((kvp.Key, kvp.Value.Probe));
_byProbe[kvp.Value.Probe] = MakeInitialStatus(kvp.Value.Obj, kvp.Value.Kind);
_probeByGobjectId[kvp.Key] = kvp.Value.Probe;
}
}
// Remove hosts that are no longer in the desired set.
var toRemove = _probeByGobjectId.Keys.Where(id => !desired.ContainsKey(id)).ToList();
foreach (var id in toRemove)
{
var probe = _probeByGobjectId[id];
toUnsubscribe.Add(probe);
_byProbe.Remove(probe);
_probeByGobjectId.Remove(id);
}
}
// Apply the diff outside the lock.
foreach (var (gobjectId, probe) in toSubscribe)
{
try
{
await _client.SubscribeAsync(probe, OnProbeValueChanged);
Log.Information("Galaxy runtime probe advised: {Probe}", probe);
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to advise galaxy runtime probe {Probe}", probe);
// Roll back the pending entry so Tick() can't later transition a never-advised
// probe from Unknown to Stopped and fan out a false-negative host-down signal.
// A concurrent SyncAsync may have re-added the same gobject under a new probe
// name, so compare against the captured probe string before removing.
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var current)
&& string.Equals(current, probe, StringComparison.OrdinalIgnoreCase))
{
_probeByGobjectId.Remove(gobjectId);
}
_byProbe.Remove(probe);
}
}
}
foreach (var probe in toUnsubscribe)
{
try
{
await _client.UnsubscribeAsync(probe);
}
catch (Exception ex)
{
Log.Debug(ex, "Failed to unadvise galaxy runtime probe {Probe} during sync", probe);
}
}
}
///
/// Routes an OnTagValueChanged callback to the probe state machine. Returns
/// when matches a bridge-owned probe
/// (in which case the owning node manager should skip its normal variable-update path).
///
public bool HandleProbeUpdate(string tagRef, Vtq vtq)
{
if (_disposed || string.IsNullOrEmpty(tagRef))
return false;
GalaxyRuntimeStatus? status;
int fromToGobjectId = 0;
GalaxyRuntimeState? transitionTo = null;
lock (_lock)
{
if (!_byProbe.TryGetValue(tagRef, out status))
return false; // not a probe — let the caller handle it normally
var now = _clock();
var isRunning = vtq.Quality.IsGood() && vtq.Value is bool b && b;
status.LastStateCallbackTime = now;
status.LastScanState = vtq.Value as bool?;
if (isRunning)
{
status.GoodUpdateCount++;
status.LastError = null;
if (status.State != GalaxyRuntimeState.Running)
{
// Only fire the host-running callback on a true Stopped → Running
// recovery. Unknown → Running happens once at startup for every host
// and is not a recovery — firing ClearHostVariablesBadQuality there
// would wipe Bad status set by the concurrently-stopping other host
// on variables that span both lists.
var wasStopped = status.State == GalaxyRuntimeState.Stopped;
status.State = GalaxyRuntimeState.Running;
status.LastStateChangeTime = now;
if (wasStopped)
{
transitionTo = GalaxyRuntimeState.Running;
fromToGobjectId = status.GobjectId;
}
}
}
else
{
status.FailureCount++;
status.LastError = BuildErrorDetail(vtq);
if (status.State != GalaxyRuntimeState.Stopped)
{
status.State = GalaxyRuntimeState.Stopped;
status.LastStateChangeTime = now;
transitionTo = GalaxyRuntimeState.Stopped;
fromToGobjectId = status.GobjectId;
}
}
}
// Invoke transition callbacks outside the lock to avoid inverting the node manager's
// lock order when it subsequently takes its own Lock to flip hosted variables.
if (transitionTo == GalaxyRuntimeState.Stopped)
{
Log.Information("Galaxy runtime {Probe} transitioned Running → Stopped ({Err})",
tagRef, status?.LastError ?? "(no detail)");
try { _onHostStopped?.Invoke(fromToGobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostStopped callback threw for {Probe}", tagRef); }
}
else if (transitionTo == GalaxyRuntimeState.Running)
{
Log.Information("Galaxy runtime {Probe} transitioned → Running", tagRef);
try { _onHostRunning?.Invoke(fromToGobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostRunning callback threw for {Probe}", tagRef); }
}
return true;
}
///
/// Periodic tick — flips Unknown entries to Stopped once their registration has been
/// outstanding for longer than the configured timeout without ever receiving a first
/// callback. Does nothing to Running or Stopped entries.
///
public void Tick()
{
if (_disposed)
return;
var transitions = new List();
lock (_lock)
{
var now = _clock();
foreach (var entry in _byProbe.Values)
{
if (entry.State != GalaxyRuntimeState.Unknown)
continue;
// LastStateChangeTime is set at creation to "now" so the timeout is measured
// from when the probe was advised.
if (entry.LastStateChangeTime.HasValue
&& now - entry.LastStateChangeTime.Value > _unknownTimeout)
{
entry.State = GalaxyRuntimeState.Stopped;
entry.LastStateChangeTime = now;
entry.FailureCount++;
entry.LastError = "Probe never received an initial callback within the unknown-resolution timeout";
transitions.Add(entry.GobjectId);
}
}
}
foreach (var gobjectId in transitions)
{
Log.Warning("Galaxy runtime gobject {GobjectId} timed out in Unknown state → Stopped", gobjectId);
try { _onHostStopped?.Invoke(gobjectId); }
catch (Exception ex) { Log.Warning(ex, "onHostStopped callback threw during tick for {GobjectId}", gobjectId); }
}
}
///
/// Returns a read-only snapshot of every tracked host. When the MxAccess transport is
/// disconnected, every entry is rewritten to Unknown on the way out so operators aren't
/// misled by cached per-host state — the Connection panel is the primary signal in that
/// case. The underlying _byProbe map is not modified.
///
public IReadOnlyList GetSnapshot()
{
var transportDown = _client.State != ConnectionState.Connected;
lock (_lock)
{
var result = new List(_byProbe.Count);
foreach (var entry in _byProbe.Values)
result.Add(Clone(entry, forceUnknown: transportDown));
// Stable ordering by name so dashboard rows don't jitter between refreshes.
result.Sort((a, b) => string.CompareOrdinal(a.ObjectName, b.ObjectName));
return result;
}
}
///
public void Dispose()
{
List probes;
lock (_lock)
{
if (_disposed)
return;
_disposed = true;
probes = _byProbe.Keys.ToList();
_byProbe.Clear();
_probeByGobjectId.Clear();
}
foreach (var probe in probes)
{
try
{
_client.UnsubscribeAsync(probe).GetAwaiter().GetResult();
}
catch (Exception ex)
{
Log.Debug(ex, "Failed to unadvise galaxy runtime probe {Probe} during Dispose", probe);
}
}
}
private void OnProbeValueChanged(string tagRef, Vtq vtq)
{
HandleProbeUpdate(tagRef, vtq);
}
private GalaxyRuntimeStatus MakeInitialStatus(GalaxyObjectInfo obj, string kind)
{
return new GalaxyRuntimeStatus
{
ObjectName = obj.TagName,
GobjectId = obj.GobjectId,
Kind = kind,
State = GalaxyRuntimeState.Unknown,
LastStateChangeTime = _clock()
};
}
private static GalaxyRuntimeStatus Clone(GalaxyRuntimeStatus src, bool forceUnknown)
{
return new GalaxyRuntimeStatus
{
ObjectName = src.ObjectName,
GobjectId = src.GobjectId,
Kind = src.Kind,
State = forceUnknown ? GalaxyRuntimeState.Unknown : src.State,
LastStateCallbackTime = src.LastStateCallbackTime,
LastStateChangeTime = src.LastStateChangeTime,
LastScanState = src.LastScanState,
LastError = forceUnknown ? null : src.LastError,
GoodUpdateCount = src.GoodUpdateCount,
FailureCount = src.FailureCount
};
}
private static string BuildErrorDetail(Vtq vtq)
{
if (vtq.Quality.IsBad())
return $"bad quality ({vtq.Quality})";
if (vtq.Quality.IsUncertain())
return $"uncertain quality ({vtq.Quality})";
if (vtq.Value is bool b && !b)
return "ScanState = false (OffScan)";
return $"unexpected value: {vtq.Value ?? "(null)"}";
}
}
}