From 04d267d1ea08c0a92ba76af5014a74e0bcf5a32e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 07:27:56 -0400 Subject: [PATCH] =?UTF-8?q?Phase=202=20PR=2013=20=E2=80=94=20port=20Galaxy?= =?UTF-8?q?RuntimeProbeManager=20state=20machine=20+=20wire=20per-platform?= =?UTF-8?q?=20ScanState=20probing.=20PR=208=20gave=20operators=20the=20gat?= =?UTF-8?q?eway-level=20transport=20signal=20(MxAccessClient.ConnectionSta?= =?UTF-8?q?teChanged=20=E2=86=92=20OnHostStatusChanged=20tagged=20with=20t?= =?UTF-8?q?he=20Wonderware=20client=20identity)=20=E2=80=94=20enough=20to?= =?UTF-8?q?=20detect=20when=20the=20entire=20MXAccess=20COM=20proxy=20dies?= =?UTF-8?q?,=20but=20silent=20when=20a=20specific=20=20or=20=20goes=20down?= =?UTF-8?q?=20while=20the=20gateway=20stays=20alive.=20This=20PR=20closes?= =?UTF-8?q?=20that=20gap:=20a=20pure-logic=20GalaxyRuntimeProbeManager=20(?= =?UTF-8?q?ported=20from=20v1's=20472-LOC=20GalaxyRuntimeProbeManager.cs,?= =?UTF-8?q?=20distilled=20to=20~240=20LOC=20of=20state=20machine=20without?= =?UTF-8?q?=20the=20OPC=20UA=20node-manager=20entanglement)=20lives=20in?= =?UTF-8?q?=20Backend/Stability/,=20advises=20.ScanState=20per=20?= =?UTF-8?q?WinPlatform=20+=20AppEngine=20gobject=20discovered=20during=20D?= =?UTF-8?q?iscoverAsync,=20runs=20the=20Unknown=20=E2=86=92=20Running=20?= =?UTF-8?q?=E2=86=92=20Stopped=20state=20machine=20with=20v1's=20documente?= =?UTF-8?q?d=20semantics=20preserved=20verbatim,=20and=20raises=20a=20Stat?= =?UTF-8?q?eChanged=20event=20on=20transitions=20operators=20should=20reac?= =?UTF-8?q?t=20to.=20MxAccessGalaxyBackend=20instantiates=20the=20probe=20?= =?UTF-8?q?manager=20in=20the=20constructor=20with=20SubscribeAsync/Unsubs?= =?UTF-8?q?cribeAsync=20delegate=20pointers=20into=20MxAccessClient,=20hoo?= =?UTF-8?q?ks=20StateChanged=20to=20forward=20each=20transition=20through?= =?UTF-8?q?=20the=20same=20OnHostStatusChanged=20IPC=20event=20the=20gatew?= =?UTF-8?q?ay=20signal=20uses=20(HostName=20=3D=20platform/engine=20TagNam?= =?UTF-8?q?e,=20RuntimeStatus=20=3D=20'Running'|'Stopped'|'Unknown',=20Las?= =?UTF-8?q?tObservedUtcUnixMs=20from=20the=20state-change=20timestamp),=20?= =?UTF-8?q?so=20Admin=20UI=20gets=20per-host=20signals=20flowing=20through?= =?UTF-8?q?=20the=20existing=20PR=208=20wire=20with=20no=20additional=20IP?= =?UTF-8?q?C=20plumbing.=20State=20machine=20rules=20ported=20from=20v1=20?= =?UTF-8?q?runtimestatus.md:=20(a)=20ScanState=20is=20on-change-only=20?= =?UTF-8?q?=E2=80=94=20a=20stably-Running=20host=20may=20go=20hours=20with?= =?UTF-8?q?out=20a=20callback,=20so=20Running=20=E2=86=92=20Stopped=20is?= =?UTF-8?q?=20driven=20only=20by=20explicit=20ScanState=3Dfalse,=20never?= =?UTF-8?q?=20by=20starvation;=20(b)=20Unknown=20=E2=86=92=20Running=20is?= =?UTF-8?q?=20a=20startup=20transition=20and=20does=20NOT=20fire=20StateCh?= =?UTF-8?q?anged=20(would=20paint=20every=20host=20as=20'just=20recovered'?= =?UTF-8?q?=20at=20startup,=20which=20is=20noise=20and=20can=20clear=20Bad?= =?UTF-8?q?=20quality=20set=20by=20a=20concurrently-stopping=20sibling);?= =?UTF-8?q?=20(c)=20Stopped=20=E2=86=92=20Running=20fires=20StateChanged?= =?UTF-8?q?=20for=20the=20real=20recovery=20case;=20(d)=20Running=20?= =?UTF-8?q?=E2=86=92=20Stopped=20fires=20StateChanged;=20(e)=20Unknown=20?= =?UTF-8?q?=E2=86=92=20Stopped=20fires=20StateChanged=20because=20that's?= =?UTF-8?q?=20the=20first-known-bad=20signal=20operators=20need=20when=20a?= =?UTF-8?q?=20host=20is=20down=20at=20our=20startup=20time.=20MxAccessGala?= =?UTF-8?q?xyBackend.DiscoverAsync=20calls=20=5FprobeManager.SyncAsync=20w?= =?UTF-8?q?ith=20the=20runtime-host=20subset=20of=20the=20hierarchy=20(Cat?= =?UTF-8?q?egoryId=20=3D=3D=201=20WinPlatform=20or=203=20AppEngine)=20as?= =?UTF-8?q?=20a=20best-effort=20step=20after=20building=20the=20Discover?= =?UTF-8?q?=20response=20=E2=80=94=20probe=20failures=20are=20swallowed=20?= =?UTF-8?q?so=20Discover=20still=20returns=20the=20hierarchy=20even=20if?= =?UTF-8?q?=20a=20per-host=20advise=20fails;=20the=20gateway=20signal=20co?= =?UTF-8?q?vers=20the=20critical=20rung.=20SyncAsync=20is=20idempotent=20(?= =?UTF-8?q?second=20call=20with=20the=20same=20set=20is=20a=20no-op)=20and?= =?UTF-8?q?=20handles=20the=20diff=20on=20re-Discover=20for=20tag=20rename?= =?UTF-8?q?=20/=20host=20add=20/=20host=20remove.=20Subscribe=20failure=20?= =?UTF-8?q?rolls=20back=20the=20host's=20state=20entry=20under=20the=20loc?= =?UTF-8?q?k=20so=20a=20later=20probe=20callback=20for=20a=20never-advised?= =?UTF-8?q?=20tag=20can't=20transition=20a=20phantom=20state=20from=20Unkn?= =?UTF-8?q?own=20to=20Stopped=20and=20fan=20out=20a=20false=20host-down=20?= =?UTF-8?q?signal=20(the=20same=20protection=20v1's=20GalaxyRuntimeProbeMa?= =?UTF-8?q?nager=20had=20at=20line=20237-243=20of=20v1=20with=20a=20captur?= =?UTF-8?q?ed-probe-string=20comparison=20under=20the=20lock).=20MxAccessG?= =?UTF-8?q?alaxyBackend.Dispose=20unsubscribes=20the=20StateChanged=20hand?= =?UTF-8?q?ler=20before=20disposing=20the=20probe=20manager=20to=20prevent?= =?UTF-8?q?=20dangling=20invocation-list=20references=20across=20reconnect?= =?UTF-8?q?s,=20same=20discipline=20as=20PR=208's=20ConnectionStateChanged?= =?UTF-8?q?=20and=20PR=206's=20SubscriptionReplayFailed.=20Tests=20(12=20n?= =?UTF-8?q?ew=20GalaxyRuntimeProbeManagerTests):=20Sync=5Fsubscribes=5Fto?= =?UTF-8?q?=5FScanState=5Fper=5Fhost=20verifies=20tag.ScanState=20subscrip?= =?UTF-8?q?tions=20are=20advised=20per=20Platform/Engine;=20Sync=5Fis=5Fid?= =?UTF-8?q?empotent=5Fon=5Frepeat=5Fcall=5Fwith=5Fsame=5Fset=20verifies=20?= =?UTF-8?q?no=20duplicate=20subscribes;=20Sync=5Funadvises=5Fremoved=5Fhos?= =?UTF-8?q?ts=20verifies=20the=20diff=20unadvises=20gone=20hosts;=20Subscr?= =?UTF-8?q?ibe=5Ffailure=5Frolls=5Fback=5Fhost=5Fentry=5Fso=5Flater=5Ftran?= =?UTF-8?q?sitions=5Fdo=5Fnot=5Ffire=5Fstale=5Fevents=20covers=20the=20rol?= =?UTF-8?q?lback-on-subscribe-fail=20guard;=20Unknown=5Fto=5FRunning=5Fdoe?= =?UTF-8?q?s=5Fnot=5Ffire=5FStateChanged=20preserves=20the=20startup-noise?= =?UTF-8?q?=20rule;=20Running=5Fto=5FStopped=5Ffires=5FStateChanged=5Fwith?= =?UTF-8?q?=5Fboth=5Fstates=20asserts=20OldState=20and=20NewState=20are=20?= =?UTF-8?q?both=20captured=20in=20the=20transition=20record;=20Stopped=5Ft?= =?UTF-8?q?o=5FRunning=5Ffires=5FStateChanged=5Ffor=5Frecovery=20verifies?= =?UTF-8?q?=20the=20recovery=20case;=20Unknown=5Fto=5FStopped=5Ffires=5FSt?= =?UTF-8?q?ateChanged=5Ffor=5Ffirst=5Fknown=5Fbad=5Fsignal=20preserves=20t?= =?UTF-8?q?he=20first-bad=20rule;=20Repeated=5FGood=5FRunning=5Fcallbacks?= =?UTF-8?q?=5Fdo=5Fnot=5Ffire=5Fduplicate=5Fevents=20verifies=20the=20stat?= =?UTF-8?q?e-tracking=20de-dup;=20Unknown=5Fcallback=5Ffor=5Fnon=5Ftracked?= =?UTF-8?q?=5Fprobe=5Fis=5Fdropped=20asserts=20a=20foreign=20callback=20is?= =?UTF-8?q?=20silently=20ignored;=20Snapshot=5Freports=5Fcurrent=5Fstate?= =?UTF-8?q?=5Ffor=5Fevery=5Ftracked=5Fhost=20covers=20the=20dashboard=20qu?= =?UTF-8?q?ery=20hook;=20IsRuntimeHost=5Frecognizes=5FWinPlatform=5Fand=5F?= =?UTF-8?q?AppEngine=5Fcategory=5Fids=20asserts=20the=20CategoryId=20filte?= =?UTF-8?q?r.=20Galaxy.Host.Tests=20Unit=20suite=2075=20pass=20/=200=20fai?= =?UTF-8?q?l=20(12=20new=20probe=20+=2063=20pre-existing).=20Galaxy.Host?= =?UTF-8?q?=20builds=20clean=20(0=20errors=20/=200=20warnings).=20Branches?= =?UTF-8?q?=20off=20v2.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Backend/MxAccessGalaxyBackend.cs | 51 ++++ .../Stability/GalaxyRuntimeProbeManager.cs | 273 ++++++++++++++++++ .../GalaxyRuntimeProbeManagerTests.cs | 231 +++++++++++++++ 3 files changed, 555 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index a537263..86e520b 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -7,6 +7,7 @@ using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; @@ -40,6 +41,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable public event System.EventHandler? OnHostStatusChanged; private readonly System.EventHandler _onConnectionStateChanged; + private readonly GalaxyRuntimeProbeManager _probeManager; + private readonly System.EventHandler _onProbeStateChanged; public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { @@ -62,8 +65,39 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable }); }; _mx.ConnectionStateChanged += _onConnectionStateChanged; + + // PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback, + // which runs the state machine and raises StateChanged on transitions we care about. + // We forward each transition through the same OnHostStatusChanged IPC event that the + // gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the + // Admin UI can show per-host health independently from the top-level transport status. + _probeManager = new GalaxyRuntimeProbeManager( + subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb), + unsubscribe: probe => _mx.UnsubscribeAsync(probe)); + _onProbeStateChanged = (_, t) => + { + OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus + { + HostName = t.TagName, + RuntimeStatus = t.NewState switch + { + HostRuntimeState.Running => "Running", + HostRuntimeState.Stopped => "Stopped", + _ => "Unknown", + }, + LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + }); + }; + _probeManager.StateChanged += _onProbeStateChanged; } + /// + /// Exposed for tests. Production flow: DiscoverAsync completes → backend calls + /// SyncProbesAsync with the runtime hosts (WinPlatform + AppEngine gobjects) to + /// advise ScanState per host. + /// + internal GalaxyRuntimeProbeManager ProbeManager => _probeManager; + public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { try @@ -103,6 +137,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty(), }).ToArray(); + // PR 13: Sync the per-platform probe manager against the just-discovered hierarchy + // so ScanState subscriptions track the current runtime set. Best-effort — probe + // failures don't block Discover from returning, since the gateway-level signal from + // MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to + // that level if any per-host probe couldn't advise. + try + { + var targets = hierarchy + .Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform + || o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine) + .Select(o => new HostProbeTarget(o.TagName, o.CategoryId)); + await _probeManager.SyncAsync(targets).ConfigureAwait(false); + } + catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ } + return new DiscoverHierarchyResponse { Success = true, Objects = objects }; } catch (Exception ex) @@ -405,6 +454,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable public void Dispose() { + _probeManager.StateChanged -= _onProbeStateChanged; + _probeManager.Dispose(); _mx.ConnectionStateChanged -= _onConnectionStateChanged; _historian?.Dispose(); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs new file mode 100644 index 0000000..f3a2612 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs @@ -0,0 +1,273 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; + +/// +/// Per-platform + per-AppEngine runtime probe. Subscribes to <TagName>.ScanState +/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped +/// transitions, and fires so +/// can forward per-host events through the existing IPC OnHostStatusChanged event. +/// Pure-logic state machine with an injected clock so it's deterministically testable — +/// port of v1 GalaxyRuntimeProbeManager without the OPC UA node-manager coupling. +/// +/// +/// State machine rules (documented in v1's runtimestatus.md and preserved here): +/// +/// ScanState is on-change-only — a stably-Running host may go hours without a +/// callback. Running → Stopped is driven by an explicit ScanState=false callback, +/// never by starvation. +/// Unknown → Running is a startup transition and does NOT fire StateChanged (would +/// paint every host as "just recovered" at startup, which is noise). +/// Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped +/// fires StateChanged because that's a first-known-bad signal operators need. +/// All public methods are thread-safe. Callbacks fire outside the internal lock to +/// avoid lock inversion with caller-owned state. +/// +/// +public sealed class GalaxyRuntimeProbeManager : IDisposable +{ + public const int CategoryWinPlatform = 1; + public const int CategoryAppEngine = 3; + public const string ProbeAttribute = ".ScanState"; + + private readonly Func _clock; + private readonly Func, Task> _subscribe; + private readonly Func _unsubscribe; + private readonly object _lock = new(); + + // probe tag → per-host state + private readonly Dictionary _byProbe = new(StringComparer.OrdinalIgnoreCase); + // tag name → probe tag (for reverse lookup on the desired-set diff) + private readonly Dictionary _probeByTagName = new(StringComparer.OrdinalIgnoreCase); + private bool _disposed; + + /// + /// Fires on every state transition that operators should react to. See class remarks + /// for the rules on which transitions fire. + /// + public event EventHandler? StateChanged; + + public GalaxyRuntimeProbeManager( + Func, Task> subscribe, + Func unsubscribe) + : this(subscribe, unsubscribe, () => DateTime.UtcNow) { } + + internal GalaxyRuntimeProbeManager( + Func, Task> subscribe, + Func unsubscribe, + Func clock) + { + _subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe)); + _unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + } + + /// Number of probes currently advised. Test/dashboard hook. + public int ActiveProbeCount + { + get { lock (_lock) return _byProbe.Count; } + } + + /// + /// Snapshot every currently-tracked host's state. One entry per probe. + /// + public IReadOnlyList SnapshotStates() + { + lock (_lock) + { + return _byProbe.Select(kv => new HostProbeSnapshot( + TagName: kv.Value.TagName, + State: kv.Value.State, + LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList(); + } + } + + /// + /// Query the current runtime state for . Returns + /// when the host is not tracked. + /// + public HostRuntimeState GetState(string tagName) + { + lock (_lock) + { + if (_probeByTagName.TryGetValue(tagName, out var probe) + && _byProbe.TryGetValue(probe, out var state)) + return state.State; + return HostRuntimeState.Unknown; + } + } + + /// + /// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover) + /// against the currently-tracked set and advise / unadvise as needed. Idempotent: + /// calling twice with the same set does nothing. + /// + public async Task SyncAsync(IEnumerable desiredHosts) + { + if (_disposed) return; + + var desired = desiredHosts + .Where(h => !string.IsNullOrWhiteSpace(h.TagName)) + .ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase); + + List toAdvise; + List toUnadvise; + lock (_lock) + { + toAdvise = desired.Keys + .Where(tag => !_probeByTagName.ContainsKey(tag)) + .ToList(); + toUnadvise = _probeByTagName.Keys + .Where(tag => !desired.ContainsKey(tag)) + .Select(tag => _probeByTagName[tag]) + .ToList(); + + foreach (var tag in toAdvise) + { + var probe = tag + ProbeAttribute; + _probeByTagName[tag] = probe; + _byProbe[probe] = new HostProbeState + { + TagName = tag, + State = HostRuntimeState.Unknown, + LastStateChangeUtc = _clock(), + }; + } + + foreach (var probe in toUnadvise) + { + _byProbe.Remove(probe); + } + + foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList()) + { + _probeByTagName.Remove(removedTag); + } + } + + foreach (var tag in toAdvise) + { + var probe = tag + ProbeAttribute; + try + { + await _subscribe(probe, OnProbeCallback); + } + catch + { + // Rollback on subscribe failure so a later Tick can't transition a never-advised + // probe into a false Stopped state. Callers can re-Sync later to retry. + lock (_lock) + { + _byProbe.Remove(probe); + _probeByTagName.Remove(tag); + } + } + } + + foreach (var probe in toUnadvise) + { + try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ } + } + } + + /// + /// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's + /// SubscribeAsync delivers VTQ updates through the callback wired in , + /// which calls this method under the lock to update state and fires + /// outside the lock for any transition that matters. + /// + public void OnProbeCallback(string probeTag, Vtq vtq) + { + if (_disposed) return; + + HostStateTransition? transition = null; + lock (_lock) + { + if (!_byProbe.TryGetValue(probeTag, out var state)) return; + + var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b; + var now = _clock(); + var previous = state.State; + state.LastCallbackUtc = now; + + if (isRunning) + { + state.GoodUpdateCount++; + if (previous != HostRuntimeState.Running) + { + state.State = HostRuntimeState.Running; + state.LastStateChangeUtc = now; + if (previous == HostRuntimeState.Stopped) + { + transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now); + } + } + } + else + { + state.FailureCount++; + if (previous != HostRuntimeState.Stopped) + { + state.State = HostRuntimeState.Stopped; + state.LastStateChangeUtc = now; + transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now); + } + } + } + + if (transition is { } t) + { + StateChanged?.Invoke(this, t); + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + lock (_lock) + { + _byProbe.Clear(); + _probeByTagName.Clear(); + } + } + + private sealed class HostProbeState + { + public string TagName { get; set; } = ""; + public HostRuntimeState State { get; set; } + public DateTime LastStateChangeUtc { get; set; } + public DateTime? LastCallbackUtc { get; set; } + public long GoodUpdateCount { get; set; } + public long FailureCount { get; set; } + } +} + +public enum HostRuntimeState +{ + Unknown, + Running, + Stopped, +} + +public sealed record HostStateTransition( + string TagName, + HostRuntimeState OldState, + HostRuntimeState NewState, + DateTime AtUtc); + +public sealed record HostProbeSnapshot( + string TagName, + HostRuntimeState State, + DateTime LastChangedUtc); + +public readonly record struct HostProbeTarget(string TagName, int CategoryId) +{ + public bool IsRuntimeHost => + CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform + || CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine; +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs new file mode 100644 index 0000000..7a11318 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs @@ -0,0 +1,231 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class GalaxyRuntimeProbeManagerTests +{ + private sealed class FakeSubscriber + { + public readonly ConcurrentDictionary> Subs = new(); + public readonly ConcurrentQueue UnsubCalls = new(); + public bool FailSubscribeFor { get; set; } + public string? FailSubscribeTag { get; set; } + + public Task Subscribe(string probe, Action cb) + { + if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase)) + throw new InvalidOperationException("subscribe refused"); + Subs[probe] = cb; + return Task.CompletedTask; + } + + public Task Unsubscribe(string probe) + { + UnsubCalls.Enqueue(probe); + Subs.TryRemove(probe, out _); + return Task.CompletedTask; + } + } + + private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192); + private static Vtq Bad() => new(null, DateTime.UtcNow, 0); + + [Fact] + public async Task Sync_subscribes_to_ScanState_per_host() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform), + new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine), + }); + + mgr.ActiveProbeCount.ShouldBe(2); + subs.Subs.ShouldContainKey("PlatformA.ScanState"); + subs.Subs.ShouldContainKey("EngineB.ScanState"); + } + + [Fact] + public async Task Sync_is_idempotent_on_repeat_call_with_same_set() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + var targets = new[] { new HostProbeTarget("PlatformA", 1) }; + + await mgr.SyncAsync(targets); + await mgr.SyncAsync(targets); + + mgr.ActiveProbeCount.ShouldBe(1); + subs.Subs.Count.ShouldBe(1); + subs.UnsubCalls.Count.ShouldBe(0); + } + + [Fact] + public async Task Sync_unadvises_removed_hosts() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", 1), + new HostProbeTarget("PlatformB", 1), + }); + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + + mgr.ActiveProbeCount.ShouldBe(1); + subs.UnsubCalls.ShouldContain("PlatformB.ScanState"); + } + + [Fact] + public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events() + { + var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" }; + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + + mgr.ActiveProbeCount.ShouldBe(0); // rolled back + mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown); + } + + [Fact] + public async Task Unknown_to_Running_does_not_fire_StateChanged() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); + + mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running); + transitions.Count.ShouldBe(0); // startup transition, operators don't care + } + + [Fact] + public async Task Running_to_Stopped_fires_StateChanged_with_both_states() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires) + + transitions.Count.ShouldBe(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.TagName.ShouldBe("PlatformA"); + t.OldState.ShouldBe(HostRuntimeState.Running); + t.NewState.ShouldBe(HostRuntimeState.Stopped); + } + + [Fact] + public async Task Stopped_to_Running_fires_StateChanged_for_recovery() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires) + + transitions.Count.ShouldBe(2); + } + + [Fact] + public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + // First callback is bad-quality — we must flag the host Stopped so operators see it. + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad()); + + transitions.Count.ShouldBe(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.OldState.ShouldBe(HostRuntimeState.Unknown); + t.NewState.ShouldBe(HostRuntimeState.Stopped); + } + + [Fact] + public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var count = 0; + mgr.StateChanged += (_, _) => Interlocked.Increment(ref count); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + for (var i = 0; i < 5; i++) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); + + count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after + } + + [Fact] + public async Task Unknown_callback_for_non_tracked_probe_is_dropped() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true)); + + mgr.ActiveProbeCount.ShouldBe(0); + } + + [Fact] + public async Task Snapshot_reports_current_state_for_every_tracked_host() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", 1), + new HostProbeTarget("EngineB", 3), + }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running + subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped + + var snap = mgr.SnapshotStates(); + snap.Count.ShouldBe(2); + snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running); + snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped); + } + + [Fact] + public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids() + { + new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue(); + new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue(); + new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse(); + new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse(); + } +}