diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs index a537263..86e520b 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccessGalaxyBackend.cs @@ -7,6 +7,7 @@ using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Galaxy; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Historian; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend; @@ -40,6 +41,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable public event System.EventHandler? OnHostStatusChanged; private readonly System.EventHandler _onConnectionStateChanged; + private readonly GalaxyRuntimeProbeManager _probeManager; + private readonly System.EventHandler _onProbeStateChanged; public MxAccessGalaxyBackend(GalaxyRepository repository, MxAccessClient mx, IHistorianDataSource? historian = null) { @@ -62,8 +65,39 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable }); }; _mx.ConnectionStateChanged += _onConnectionStateChanged; + + // PR 13: per-platform runtime probes. ScanState subscriptions fire OnProbeCallback, + // which runs the state machine and raises StateChanged on transitions we care about. + // We forward each transition through the same OnHostStatusChanged IPC event that the + // gateway-level ConnectionStateChanged uses — tagged with the platform's TagName so the + // Admin UI can show per-host health independently from the top-level transport status. + _probeManager = new GalaxyRuntimeProbeManager( + subscribe: (probe, cb) => _mx.SubscribeAsync(probe, cb), + unsubscribe: probe => _mx.UnsubscribeAsync(probe)); + _onProbeStateChanged = (_, t) => + { + OnHostStatusChanged?.Invoke(this, new HostConnectivityStatus + { + HostName = t.TagName, + RuntimeStatus = t.NewState switch + { + HostRuntimeState.Running => "Running", + HostRuntimeState.Stopped => "Stopped", + _ => "Unknown", + }, + LastObservedUtcUnixMs = new DateTimeOffset(t.AtUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(), + }); + }; + _probeManager.StateChanged += _onProbeStateChanged; } + /// + /// Exposed for tests. Production flow: DiscoverAsync completes → backend calls + /// SyncProbesAsync with the runtime hosts (WinPlatform + AppEngine gobjects) to + /// advise ScanState per host. + /// + internal GalaxyRuntimeProbeManager ProbeManager => _probeManager; + public async Task OpenSessionAsync(OpenSessionRequest req, CancellationToken ct) { try @@ -103,6 +137,21 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable Attributes = attrsByGobject.TryGetValue(o.GobjectId, out var a) ? a : Array.Empty(), }).ToArray(); + // PR 13: Sync the per-platform probe manager against the just-discovered hierarchy + // so ScanState subscriptions track the current runtime set. Best-effort — probe + // failures don't block Discover from returning, since the gateway-level signal from + // MxAccessClient.ConnectionStateChanged still flows and the Admin UI degrades to + // that level if any per-host probe couldn't advise. + try + { + var targets = hierarchy + .Where(o => o.CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform + || o.CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine) + .Select(o => new HostProbeTarget(o.TagName, o.CategoryId)); + await _probeManager.SyncAsync(targets).ConfigureAwait(false); + } + catch { /* swallow — Discover succeeded; probes are a diagnostic enrichment */ } + return new DiscoverHierarchyResponse { Success = true, Objects = objects }; } catch (Exception ex) @@ -405,6 +454,8 @@ public sealed class MxAccessGalaxyBackend : IGalaxyBackend, IDisposable public void Dispose() { + _probeManager.StateChanged -= _onProbeStateChanged; + _probeManager.Dispose(); _mx.ConnectionStateChanged -= _onConnectionStateChanged; _historian?.Dispose(); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs new file mode 100644 index 0000000..f3a2612 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs @@ -0,0 +1,273 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; + +/// +/// Per-platform + per-AppEngine runtime probe. Subscribes to <TagName>.ScanState +/// for each $WinPlatform and $AppEngine gobject, tracks Unknown → Running → Stopped +/// transitions, and fires so +/// can forward per-host events through the existing IPC OnHostStatusChanged event. +/// Pure-logic state machine with an injected clock so it's deterministically testable — +/// port of v1 GalaxyRuntimeProbeManager without the OPC UA node-manager coupling. +/// +/// +/// State machine rules (documented in v1's runtimestatus.md and preserved here): +/// +/// ScanState is on-change-only — a stably-Running host may go hours without a +/// callback. Running → Stopped is driven by an explicit ScanState=false callback, +/// never by starvation. +/// Unknown → Running is a startup transition and does NOT fire StateChanged (would +/// paint every host as "just recovered" at startup, which is noise). +/// Stopped → Running and Running → Stopped fire StateChanged. Unknown → Stopped +/// fires StateChanged because that's a first-known-bad signal operators need. +/// All public methods are thread-safe. Callbacks fire outside the internal lock to +/// avoid lock inversion with caller-owned state. +/// +/// +public sealed class GalaxyRuntimeProbeManager : IDisposable +{ + public const int CategoryWinPlatform = 1; + public const int CategoryAppEngine = 3; + public const string ProbeAttribute = ".ScanState"; + + private readonly Func _clock; + private readonly Func, Task> _subscribe; + private readonly Func _unsubscribe; + private readonly object _lock = new(); + + // probe tag → per-host state + private readonly Dictionary _byProbe = new(StringComparer.OrdinalIgnoreCase); + // tag name → probe tag (for reverse lookup on the desired-set diff) + private readonly Dictionary _probeByTagName = new(StringComparer.OrdinalIgnoreCase); + private bool _disposed; + + /// + /// Fires on every state transition that operators should react to. See class remarks + /// for the rules on which transitions fire. + /// + public event EventHandler? StateChanged; + + public GalaxyRuntimeProbeManager( + Func, Task> subscribe, + Func unsubscribe) + : this(subscribe, unsubscribe, () => DateTime.UtcNow) { } + + internal GalaxyRuntimeProbeManager( + Func, Task> subscribe, + Func unsubscribe, + Func clock) + { + _subscribe = subscribe ?? throw new ArgumentNullException(nameof(subscribe)); + _unsubscribe = unsubscribe ?? throw new ArgumentNullException(nameof(unsubscribe)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + } + + /// Number of probes currently advised. Test/dashboard hook. + public int ActiveProbeCount + { + get { lock (_lock) return _byProbe.Count; } + } + + /// + /// Snapshot every currently-tracked host's state. One entry per probe. + /// + public IReadOnlyList SnapshotStates() + { + lock (_lock) + { + return _byProbe.Select(kv => new HostProbeSnapshot( + TagName: kv.Value.TagName, + State: kv.Value.State, + LastChangedUtc: kv.Value.LastStateChangeUtc)).ToList(); + } + } + + /// + /// Query the current runtime state for . Returns + /// when the host is not tracked. + /// + public HostRuntimeState GetState(string tagName) + { + lock (_lock) + { + if (_probeByTagName.TryGetValue(tagName, out var probe) + && _byProbe.TryGetValue(probe, out var state)) + return state.State; + return HostRuntimeState.Unknown; + } + } + + /// + /// Diff the desired host set (filtered $WinPlatform / $AppEngine from the latest Discover) + /// against the currently-tracked set and advise / unadvise as needed. Idempotent: + /// calling twice with the same set does nothing. + /// + public async Task SyncAsync(IEnumerable desiredHosts) + { + if (_disposed) return; + + var desired = desiredHosts + .Where(h => !string.IsNullOrWhiteSpace(h.TagName)) + .ToDictionary(h => h.TagName, StringComparer.OrdinalIgnoreCase); + + List toAdvise; + List toUnadvise; + lock (_lock) + { + toAdvise = desired.Keys + .Where(tag => !_probeByTagName.ContainsKey(tag)) + .ToList(); + toUnadvise = _probeByTagName.Keys + .Where(tag => !desired.ContainsKey(tag)) + .Select(tag => _probeByTagName[tag]) + .ToList(); + + foreach (var tag in toAdvise) + { + var probe = tag + ProbeAttribute; + _probeByTagName[tag] = probe; + _byProbe[probe] = new HostProbeState + { + TagName = tag, + State = HostRuntimeState.Unknown, + LastStateChangeUtc = _clock(), + }; + } + + foreach (var probe in toUnadvise) + { + _byProbe.Remove(probe); + } + + foreach (var removedTag in _probeByTagName.Keys.Where(t => !desired.ContainsKey(t)).ToList()) + { + _probeByTagName.Remove(removedTag); + } + } + + foreach (var tag in toAdvise) + { + var probe = tag + ProbeAttribute; + try + { + await _subscribe(probe, OnProbeCallback); + } + catch + { + // Rollback on subscribe failure so a later Tick can't transition a never-advised + // probe into a false Stopped state. Callers can re-Sync later to retry. + lock (_lock) + { + _byProbe.Remove(probe); + _probeByTagName.Remove(tag); + } + } + } + + foreach (var probe in toUnadvise) + { + try { await _unsubscribe(probe); } catch { /* best-effort cleanup */ } + } + } + + /// + /// Public entry point for tests and internal callbacks. Production flow: MxAccessClient's + /// SubscribeAsync delivers VTQ updates through the callback wired in , + /// which calls this method under the lock to update state and fires + /// outside the lock for any transition that matters. + /// + public void OnProbeCallback(string probeTag, Vtq vtq) + { + if (_disposed) return; + + HostStateTransition? transition = null; + lock (_lock) + { + if (!_byProbe.TryGetValue(probeTag, out var state)) return; + + var isRunning = vtq.Quality >= 192 && vtq.Value is bool b && b; + var now = _clock(); + var previous = state.State; + state.LastCallbackUtc = now; + + if (isRunning) + { + state.GoodUpdateCount++; + if (previous != HostRuntimeState.Running) + { + state.State = HostRuntimeState.Running; + state.LastStateChangeUtc = now; + if (previous == HostRuntimeState.Stopped) + { + transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Running, now); + } + } + } + else + { + state.FailureCount++; + if (previous != HostRuntimeState.Stopped) + { + state.State = HostRuntimeState.Stopped; + state.LastStateChangeUtc = now; + transition = new HostStateTransition(state.TagName, previous, HostRuntimeState.Stopped, now); + } + } + } + + if (transition is { } t) + { + StateChanged?.Invoke(this, t); + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + lock (_lock) + { + _byProbe.Clear(); + _probeByTagName.Clear(); + } + } + + private sealed class HostProbeState + { + public string TagName { get; set; } = ""; + public HostRuntimeState State { get; set; } + public DateTime LastStateChangeUtc { get; set; } + public DateTime? LastCallbackUtc { get; set; } + public long GoodUpdateCount { get; set; } + public long FailureCount { get; set; } + } +} + +public enum HostRuntimeState +{ + Unknown, + Running, + Stopped, +} + +public sealed record HostStateTransition( + string TagName, + HostRuntimeState OldState, + HostRuntimeState NewState, + DateTime AtUtc); + +public sealed record HostProbeSnapshot( + string TagName, + HostRuntimeState State, + DateTime LastChangedUtc); + +public readonly record struct HostProbeTarget(string TagName, int CategoryId) +{ + public bool IsRuntimeHost => + CategoryId == GalaxyRuntimeProbeManager.CategoryWinPlatform + || CategoryId == GalaxyRuntimeProbeManager.CategoryAppEngine; +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs new file mode 100644 index 0000000..7a11318 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs @@ -0,0 +1,231 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class GalaxyRuntimeProbeManagerTests +{ + private sealed class FakeSubscriber + { + public readonly ConcurrentDictionary> Subs = new(); + public readonly ConcurrentQueue UnsubCalls = new(); + public bool FailSubscribeFor { get; set; } + public string? FailSubscribeTag { get; set; } + + public Task Subscribe(string probe, Action cb) + { + if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase)) + throw new InvalidOperationException("subscribe refused"); + Subs[probe] = cb; + return Task.CompletedTask; + } + + public Task Unsubscribe(string probe) + { + UnsubCalls.Enqueue(probe); + Subs.TryRemove(probe, out _); + return Task.CompletedTask; + } + } + + private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192); + private static Vtq Bad() => new(null, DateTime.UtcNow, 0); + + [Fact] + public async Task Sync_subscribes_to_ScanState_per_host() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform), + new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine), + }); + + mgr.ActiveProbeCount.ShouldBe(2); + subs.Subs.ShouldContainKey("PlatformA.ScanState"); + subs.Subs.ShouldContainKey("EngineB.ScanState"); + } + + [Fact] + public async Task Sync_is_idempotent_on_repeat_call_with_same_set() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + var targets = new[] { new HostProbeTarget("PlatformA", 1) }; + + await mgr.SyncAsync(targets); + await mgr.SyncAsync(targets); + + mgr.ActiveProbeCount.ShouldBe(1); + subs.Subs.Count.ShouldBe(1); + subs.UnsubCalls.Count.ShouldBe(0); + } + + [Fact] + public async Task Sync_unadvises_removed_hosts() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", 1), + new HostProbeTarget("PlatformB", 1), + }); + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + + mgr.ActiveProbeCount.ShouldBe(1); + subs.UnsubCalls.ShouldContain("PlatformB.ScanState"); + } + + [Fact] + public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events() + { + var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" }; + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + + mgr.ActiveProbeCount.ShouldBe(0); // rolled back + mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown); + } + + [Fact] + public async Task Unknown_to_Running_does_not_fire_StateChanged() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); + + mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running); + transitions.Count.ShouldBe(0); // startup transition, operators don't care + } + + [Fact] + public async Task Running_to_Stopped_fires_StateChanged_with_both_states() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires) + + transitions.Count.ShouldBe(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.TagName.ShouldBe("PlatformA"); + t.OldState.ShouldBe(HostRuntimeState.Running); + t.NewState.ShouldBe(HostRuntimeState.Stopped); + } + + [Fact] + public async Task Stopped_to_Running_fires_StateChanged_for_recovery() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires) + + transitions.Count.ShouldBe(2); + } + + [Fact] + public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var transitions = new ConcurrentQueue(); + mgr.StateChanged += (_, t) => transitions.Enqueue(t); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + // First callback is bad-quality — we must flag the host Stopped so operators see it. + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad()); + + transitions.Count.ShouldBe(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.OldState.ShouldBe(HostRuntimeState.Unknown); + t.NewState.ShouldBe(HostRuntimeState.Stopped); + } + + [Fact] + public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + var count = 0; + mgr.StateChanged += (_, _) => Interlocked.Increment(ref count); + + await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) }); + for (var i = 0; i < 5; i++) + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); + + count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after + } + + [Fact] + public async Task Unknown_callback_for_non_tracked_probe_is_dropped() + { + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe); + + mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true)); + + mgr.ActiveProbeCount.ShouldBe(0); + } + + [Fact] + public async Task Snapshot_reports_current_state_for_every_tracked_host() + { + var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc); + var subs = new FakeSubscriber(); + using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now); + + await mgr.SyncAsync(new[] + { + new HostProbeTarget("PlatformA", 1), + new HostProbeTarget("EngineB", 3), + }); + subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running + subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped + + var snap = mgr.SnapshotStates(); + snap.Count.ShouldBe(2); + snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running); + snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped); + } + + [Fact] + public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids() + { + new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue(); + new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue(); + new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse(); + new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse(); + } +}