Files
lmxopcua/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/GalaxyRuntimeProbeManagerTests.cs
Joseph Doherty 04d267d1ea Phase 2 PR 13 — port GalaxyRuntimeProbeManager state machine + wire per-platform ScanState probing. PR 8 gave operators the gateway-level transport signal (MxAccessClient.ConnectionStateChanged → OnHostStatusChanged tagged with the Wonderware client identity) — enough to detect when the entire MXAccess COM proxy dies, but silent when a specific or goes down while the gateway stays alive. This PR closes that gap: a pure-logic GalaxyRuntimeProbeManager (ported from v1's 472-LOC GalaxyRuntimeProbeManager.cs, distilled to ~240 LOC of state machine without the OPC UA node-manager entanglement) lives in Backend/Stability/, advises <TagName>.ScanState per WinPlatform + AppEngine gobject discovered during DiscoverAsync, runs the Unknown → Running → Stopped state machine with v1's documented semantics preserved verbatim, and raises a StateChanged event on transitions operators should react to. MxAccessGalaxyBackend instantiates the probe manager in the constructor with SubscribeAsync/UnsubscribeAsync delegate pointers into MxAccessClient, hooks StateChanged to forward each transition through the same OnHostStatusChanged IPC event the gateway signal uses (HostName = platform/engine TagName, RuntimeStatus = 'Running'|'Stopped'|'Unknown', LastObservedUtcUnixMs from the state-change timestamp), so Admin UI gets per-host signals flowing through the existing PR 8 wire with no additional IPC plumbing. State machine rules ported from v1 runtimestatus.md: (a) ScanState is on-change-only — a stably-Running host may go hours without a callback, so Running → Stopped is driven only by explicit ScanState=false, never by starvation; (b) Unknown → Running is a startup transition and does NOT fire StateChanged (would paint every host as 'just recovered' at startup, which is noise and can clear Bad quality set by a concurrently-stopping sibling); (c) Stopped → Running fires StateChanged for the real recovery case; (d) Running → Stopped fires StateChanged; (e) Unknown → Stopped fires StateChanged because that's the first-known-bad signal operators need when a host is down at our startup time. MxAccessGalaxyBackend.DiscoverAsync calls _probeManager.SyncAsync with the runtime-host subset of the hierarchy (CategoryId == 1 WinPlatform or 3 AppEngine) as a best-effort step after building the Discover response — probe failures are swallowed so Discover still returns the hierarchy even if a per-host advise fails; the gateway signal covers the critical rung. SyncAsync is idempotent (second call with the same set is a no-op) and handles the diff on re-Discover for tag rename / host add / host remove. Subscribe failure rolls back the host's state entry under the lock so a later probe callback for a never-advised tag can't transition a phantom state from Unknown to Stopped and fan out a false host-down signal (the same protection v1's GalaxyRuntimeProbeManager had at line 237-243 of v1 with a captured-probe-string comparison under the lock). MxAccessGalaxyBackend.Dispose unsubscribes the StateChanged handler before disposing the probe manager to prevent dangling invocation-list references across reconnects, same discipline as PR 8's ConnectionStateChanged and PR 6's SubscriptionReplayFailed. Tests (12 new GalaxyRuntimeProbeManagerTests): Sync_subscribes_to_ScanState_per_host verifies tag.ScanState subscriptions are advised per Platform/Engine; Sync_is_idempotent_on_repeat_call_with_same_set verifies no duplicate subscribes; Sync_unadvises_removed_hosts verifies the diff unadvises gone hosts; Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events covers the rollback-on-subscribe-fail guard; Unknown_to_Running_does_not_fire_StateChanged preserves the startup-noise rule; Running_to_Stopped_fires_StateChanged_with_both_states asserts OldState and NewState are both captured in the transition record; Stopped_to_Running_fires_StateChanged_for_recovery verifies the recovery case; Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal preserves the first-bad rule; Repeated_Good_Running_callbacks_do_not_fire_duplicate_events verifies the state-tracking de-dup; Unknown_callback_for_non_tracked_probe_is_dropped asserts a foreign callback is silently ignored; Snapshot_reports_current_state_for_every_tracked_host covers the dashboard query hook; IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids asserts the CategoryId filter. Galaxy.Host.Tests Unit suite 75 pass / 0 fail (12 new probe + 63 pre-existing). Galaxy.Host builds clean (0 errors / 0 warnings). Branches off v2.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 07:27:56 -04:00

232 lines
9.4 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.Stability;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class GalaxyRuntimeProbeManagerTests
{
private sealed class FakeSubscriber
{
public readonly ConcurrentDictionary<string, Action<string, Vtq>> Subs = new();
public readonly ConcurrentQueue<string> UnsubCalls = new();
public bool FailSubscribeFor { get; set; }
public string? FailSubscribeTag { get; set; }
public Task Subscribe(string probe, Action<string, Vtq> cb)
{
if (FailSubscribeFor && string.Equals(probe, FailSubscribeTag, StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("subscribe refused");
Subs[probe] = cb;
return Task.CompletedTask;
}
public Task Unsubscribe(string probe)
{
UnsubCalls.Enqueue(probe);
Subs.TryRemove(probe, out _);
return Task.CompletedTask;
}
}
private static Vtq Good(bool scanState) => new(scanState, DateTime.UtcNow, 192);
private static Vtq Bad() => new(null, DateTime.UtcNow, 0);
[Fact]
public async Task Sync_subscribes_to_ScanState_per_host()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", GalaxyRuntimeProbeManager.CategoryWinPlatform),
new HostProbeTarget("EngineB", GalaxyRuntimeProbeManager.CategoryAppEngine),
});
mgr.ActiveProbeCount.ShouldBe(2);
subs.Subs.ShouldContainKey("PlatformA.ScanState");
subs.Subs.ShouldContainKey("EngineB.ScanState");
}
[Fact]
public async Task Sync_is_idempotent_on_repeat_call_with_same_set()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
var targets = new[] { new HostProbeTarget("PlatformA", 1) };
await mgr.SyncAsync(targets);
await mgr.SyncAsync(targets);
mgr.ActiveProbeCount.ShouldBe(1);
subs.Subs.Count.ShouldBe(1);
subs.UnsubCalls.Count.ShouldBe(0);
}
[Fact]
public async Task Sync_unadvises_removed_hosts()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("PlatformB", 1),
});
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(1);
subs.UnsubCalls.ShouldContain("PlatformB.ScanState");
}
[Fact]
public async Task Subscribe_failure_rolls_back_host_entry_so_later_transitions_do_not_fire_stale_events()
{
var subs = new FakeSubscriber { FailSubscribeFor = true, FailSubscribeTag = "PlatformA.ScanState" };
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
mgr.ActiveProbeCount.ShouldBe(0); // rolled back
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Unknown);
}
[Fact]
public async Task Unknown_to_Running_does_not_fire_StateChanged()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
mgr.GetState("PlatformA").ShouldBe(HostRuntimeState.Running);
transitions.Count.ShouldBe(0); // startup transition, operators don't care
}
[Fact]
public async Task Running_to_Stopped_fires_StateChanged_with_both_states()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.TagName.ShouldBe("PlatformA");
t.OldState.ShouldBe(HostRuntimeState.Running);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Stopped_to_Running_fires_StateChanged_for_recovery()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Unknown→Running (silent)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(false)); // Running→Stopped (fires)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Stopped→Running (fires)
transitions.Count.ShouldBe(2);
}
[Fact]
public async Task Unknown_to_Stopped_fires_StateChanged_for_first_known_bad_signal()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var transitions = new ConcurrentQueue<HostStateTransition>();
mgr.StateChanged += (_, t) => transitions.Enqueue(t);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
// First callback is bad-quality — we must flag the host Stopped so operators see it.
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Bad());
transitions.Count.ShouldBe(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.OldState.ShouldBe(HostRuntimeState.Unknown);
t.NewState.ShouldBe(HostRuntimeState.Stopped);
}
[Fact]
public async Task Repeated_Good_Running_callbacks_do_not_fire_duplicate_events()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
var count = 0;
mgr.StateChanged += (_, _) => Interlocked.Increment(ref count);
await mgr.SyncAsync(new[] { new HostProbeTarget("PlatformA", 1) });
for (var i = 0; i < 5; i++)
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true));
count.ShouldBe(0); // only the silent Unknown→Running on the first, no events after
}
[Fact]
public async Task Unknown_callback_for_non_tracked_probe_is_dropped()
{
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe);
mgr.OnProbeCallback("ProbeForSomeoneElse.ScanState", Good(true));
mgr.ActiveProbeCount.ShouldBe(0);
}
[Fact]
public async Task Snapshot_reports_current_state_for_every_tracked_host()
{
var now = new DateTime(2026, 4, 18, 10, 0, 0, DateTimeKind.Utc);
var subs = new FakeSubscriber();
using var mgr = new GalaxyRuntimeProbeManager(subs.Subscribe, subs.Unsubscribe, () => now);
await mgr.SyncAsync(new[]
{
new HostProbeTarget("PlatformA", 1),
new HostProbeTarget("EngineB", 3),
});
subs.Subs["PlatformA.ScanState"]("PlatformA.ScanState", Good(true)); // Running
subs.Subs["EngineB.ScanState"]("EngineB.ScanState", Bad()); // Stopped
var snap = mgr.SnapshotStates();
snap.Count.ShouldBe(2);
snap.ShouldContain(s => s.TagName == "PlatformA" && s.State == HostRuntimeState.Running);
snap.ShouldContain(s => s.TagName == "EngineB" && s.State == HostRuntimeState.Stopped);
}
[Fact]
public void IsRuntimeHost_recognizes_WinPlatform_and_AppEngine_category_ids()
{
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryWinPlatform).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", GalaxyRuntimeProbeManager.CategoryAppEngine).IsRuntimeHost.ShouldBeTrue();
new HostProbeTarget("X", 4 /* $Area */).IsRuntimeHost.ShouldBeFalse();
new HostProbeTarget("X", 11 /* $ApplicationObject */).IsRuntimeHost.ShouldBeFalse();
}
}