274 lines
9.9 KiB
C#
274 lines
9.9 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
|
|
|
|
/// <summary>
|
|
/// Per-platform + per-AppEngine runtime probe. Subscribes to <c><TagName>.ScanState</c>
|
|
/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped
|
|
/// transitions, and fires <see cref="StateChanged"/> so <see cref="Backend.MxAccessGalaxyBackend"/>
|
|
/// can forward per-host events through the existing IPC <c>OnHostStatusChanged</c> event.
|
|
/// Pure-logic state machine with an injected clock so it's deterministically testable —
|
|
/// port of v1 <c>GalaxyRuntimeProbeManager</c> without the OPC UA node-manager coupling.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// State machine rules (documented in v1's <c>runtimestatus.md</c> and preserved here):
|
|
/// <list type="bullet">
|
|
/// <item><c>ScanState</c> is on-change-only — a stably-Running host may go hours without a
|
|
/// callback. Running → Stopped is driven by an explicit <c>ScanState=false</c> callback,
|
|
/// never by starvation.</item>
|
|
/// <item>Unknown → Running is a startup transition and does NOT fire StateChanged (would
|
|
/// paint every host as "just recovered" at startup, which is noise).</item>
|
|
/// <item>Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped
|
|
/// fires StateChanged because that's a first-known-bad signal operators need.</item>
|
|
/// <item>All public methods are thread-safe. Callbacks fire outside the internal lock to
|
|
/// avoid lock inversion with caller-owned state.</item>
|
|
/// </list>
|
|
/// </remarks>
|
|
public sealed class GalaxyRuntimeProbeManager : IDisposable
|
|
{
|
|
public const int CategoryWinPlatform = 1;
|
|
public const int CategoryAppEngine = 3;
|
|
public const string ProbeAttribute = ".ScanState";
|
|
|
|
private readonly Func<DateTime> _clock;
|
|
private readonly Func<string, Action<string, Vtq>, Task> _subscribe;
|
|
private readonly Func<string, Task> _unsubscribe;
|
|
private readonly object _lock = new();
|
|
|
|
// probe tag → per-host state
|
|
private readonly Dictionary<string, HostProbeState> _byProbe = new(StringComparer.OrdinalIgnoreCase);
|
|
// tag name → probe tag (for reverse lookup on the desired-set diff)
|
|
private readonly Dictionary<string, string> _probeByTagName = new(StringComparer.OrdinalIgnoreCase);
|
|
private bool _disposed;
|
|
|
|
/// <summary>
|
|
/// Fires on every state transition that operators should react to. See class remarks
|
|
/// for the rules on which transitions fire.
|
|
/// </summary>
|
|
public event EventHandler<HostStateTransition>? StateChanged;
|
|
|
|
public GalaxyRuntimeProbeManager(
|
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
|
Func<string, Task> unsubscribe)
|
|
: this(subscribe, unsubscribe, () => DateTime.UtcNow) { }
|
|
|
|
internal GalaxyRuntimeProbeManager(
|
|
Func<string, Action<string, Vtq>, Task> subscribe,
|
|
Func<string, Task> unsubscribe,
|
|
Func<DateTime> clock)
|
|
{
|
|
_subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe));
|
|
_unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe));
|
|
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
|
|
}
|
|
|
|
/// <summary>Number of probes currently advised. Test/dashboard hook.</summary>
|
|
public int ActiveProbeCount
|
|
{
|
|
get { lock (_lock) return _byProbe.Count; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Snapshot every currently-tracked host's state. One entry per probe.
|
|
/// </summary>
|
|
public IReadOnlyList<HostProbeSnapshot> SnapshotStates()
|
|
{
|
|
lock (_lock)
|
|
{
|
|
return _byProbe.Select(kv => new HostProbeSnapshot(
|
|
TagName: kv.Value.TagName,
|
|
State: kv.Value.State,
|
|
LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Query the current runtime state for <paramref name="tagName"/>. Returns
|
|
/// <see cref="HostRuntimeState.Unknown"/> when the host is not tracked.
|
|
/// </summary>
|
|
public HostRuntimeState GetState(string tagName)
|
|
{
|
|
lock (_lock)
|
|
{
|
|
if (_probeByTagName.TryGetValue(tagName, out var probe)
|
|
&& _byProbe.TryGetValue(probe, out var state))
|
|
return state.State;
|
|
return HostRuntimeState.Unknown;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover)
|
|
/// against the currently-tracked set and advise / unadvise as needed. Idempotent:
|
|
/// calling twice with the same set does nothing.
|
|
/// </summary>
|
|
public async Task SyncAsync(IEnumerable<HostProbeTarget> desiredHosts)
|
|
{
|
|
if (_disposed) return;
|
|
|
|
var desired = desiredHosts
|
|
.Where(h => !string.IsNullOrWhiteSpace(h.TagName))
|
|
.ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase);
|
|
|
|
List<string> toAdvise;
|
|
List<string> toUnadvise;
|
|
lock (_lock)
|
|
{
|
|
toAdvise = desired.Keys
|
|
.Where(tag => !_probeByTagName.ContainsKey(tag))
|
|
.ToList();
|
|
toUnadvise = _probeByTagName.Keys
|
|
.Where(tag => !desired.ContainsKey(tag))
|
|
.Select(tag => _probeByTagName[tag])
|
|
.ToList();
|
|
|
|
foreach (var tag in toAdvise)
|
|
{
|
|
var probe = tag + ProbeAttribute;
|
|
_probeByTagName[tag] = probe;
|
|
_byProbe[probe] = new HostProbeState
|
|
{
|
|
TagName = tag,
|
|
State = HostRuntimeState.Unknown,
|
|
LastStateChangeUtc = _clock(),
|
|
};
|
|
}
|
|
|
|
foreach (var probe in toUnadvise)
|
|
{
|
|
_byProbe.Remove(probe);
|
|
}
|
|
|
|
foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList())
|
|
{
|
|
_probeByTagName.Remove(removedTag);
|
|
}
|
|
}
|
|
|
|
foreach (var tag in toAdvise)
|
|
{
|
|
var probe = tag + ProbeAttribute;
|
|
try
|
|
{
|
|
await _subscribe(probe, OnProbeCallback);
|
|
}
|
|
catch
|
|
{
|
|
// Rollback on subscribe failure so a later Tick can't transition a never-advised
|
|
// probe into a false Stopped state. Callers can re-Sync later to retry.
|
|
lock (_lock)
|
|
{
|
|
_byProbe.Remove(probe);
|
|
_probeByTagName.Remove(tag);
|
|
}
|
|
}
|
|
}
|
|
|
|
foreach (var probe in toUnadvise)
|
|
{
|
|
try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ }
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's
|
|
/// SubscribeAsync delivers VTQ updates through the callback wired in <see cref="SyncAsync"/>,
|
|
/// which calls this method under the lock to update state and fires
|
|
/// <see cref="StateChanged"/> outside the lock for any transition that matters.
|
|
/// </summary>
|
|
public void OnProbeCallback(string probeTag, Vtq vtq)
|
|
{
|
|
if (_disposed) return;
|
|
|
|
HostStateTransition? transition = null;
|
|
lock (_lock)
|
|
{
|
|
if (!_byProbe.TryGetValue(probeTag, out var state)) return;
|
|
|
|
var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b;
|
|
var now = _clock();
|
|
var previous = state.State;
|
|
state.LastCallbackUtc = now;
|
|
|
|
if (isRunning)
|
|
{
|
|
state.GoodUpdateCount++;
|
|
if (previous != HostRuntimeState.Running)
|
|
{
|
|
state.State = HostRuntimeState.Running;
|
|
state.LastStateChangeUtc = now;
|
|
if (previous == HostRuntimeState.Stopped)
|
|
{
|
|
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
state.FailureCount++;
|
|
if (previous != HostRuntimeState.Stopped)
|
|
{
|
|
state.State = HostRuntimeState.Stopped;
|
|
state.LastStateChangeUtc = now;
|
|
transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (transition is { } t)
|
|
{
|
|
StateChanged?.Invoke(this, t);
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
lock (_lock)
|
|
{
|
|
_byProbe.Clear();
|
|
_probeByTagName.Clear();
|
|
}
|
|
}
|
|
|
|
private sealed class HostProbeState
|
|
{
|
|
public string TagName { get; set; } = "";
|
|
public HostRuntimeState State { get; set; }
|
|
public DateTime LastStateChangeUtc { get; set; }
|
|
public DateTime? LastCallbackUtc { get; set; }
|
|
public long GoodUpdateCount { get; set; }
|
|
public long FailureCount { get; set; }
|
|
}
|
|
}
|
|
|
|
public enum HostRuntimeState
|
|
{
|
|
Unknown,
|
|
Running,
|
|
Stopped,
|
|
}
|
|
|
|
public sealed record HostStateTransition(
|
|
string TagName,
|
|
HostRuntimeState OldState,
|
|
HostRuntimeState NewState,
|
|
DateTime AtUtc);
|
|
|
|
public sealed record HostProbeSnapshot(
|
|
string TagName,
|
|
HostRuntimeState State,
|
|
DateTime LastChangedUtc);
|
|
|
|
public readonly record struct HostProbeTarget(string TagName, int CategoryId)
|
|
{
|
|
public bool IsRuntimeHost =>
|
|
CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform
|
|
|| CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine;
|
|
}
|