diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostConnectivityForwarder.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostConnectivityForwarder.cs new file mode 100644 index 0000000..8929fc6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostConnectivityForwarder.cs @@ -0,0 +1,58 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; + +/// +/// Pushes the synthetic top-level transport-health entry into the +/// . Each driver instance has one entry under its +/// MxAccess.ClientName reflecting the gateway transport state — useful for +/// dashboards that want a single "Galaxy is up" signal independent of any individual +/// platform's ScanState. +/// +/// +/// The eventual production source for this signal is the gateway's StreamSessionHealth +/// RPC (mxaccessgw issue gw-6). Until that ships, the driver-side reconnect supervisor +/// (PR 4.5) calls on transport state transitions: +/// when the gw session re-Registers, +/// when the supervisor moves to TransportLost. The forwarder is intentionally +/// stateless beyond the cached client name + last-pushed value so the supervisor can +/// drive it without any back-pressure plumbing. +/// +public sealed class HostConnectivityForwarder : IDisposable +{ + private readonly string _clientName; + private readonly HostStatusAggregator _aggregator; + private readonly ILogger _logger; + private bool _disposed; + + public HostConnectivityForwarder(string clientName, HostStatusAggregator aggregator, ILogger? logger = null) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clientName); + _clientName = clientName; + _aggregator = aggregator ?? throw new ArgumentNullException(nameof(aggregator)); + _logger = logger ?? NullLogger.Instance; + } + + /// + /// Push a transport state into the aggregator. Idempotent at the aggregator layer — + /// repeated calls with the same state don't fan out duplicate transitions. + /// + public void SetTransport(HostState state) + { + ObjectDisposedException.ThrowIf(_disposed, this); + var status = new HostConnectivityStatus(_clientName, state, DateTime.UtcNow); + _aggregator.Update(status); + _logger.LogDebug( + "GalaxyDriver transport state for {ClientName}: {State}", + _clientName, state); + } + + public void Dispose() + { + // No-op today; reserved for the eventual gw-6 StreamSessionHealth consumer that + // will own a long-running task this method tears down. + _disposed = true; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs new file mode 100644 index 0000000..42a5fa4 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/HostStatusAggregator.cs @@ -0,0 +1,98 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; + +/// +/// Pure-logic merger for the per-host connectivity entries that +/// surfaces. Holds the current set of host +/// statuses (one synthetic top-level transport entry plus one entry per +/// $WinPlatform/$AppEngine probe) and emits +/// only when an upsert actually changes a host's +/// — re-asserting the same state is a no-op so a stable +/// ScanState=Running burst doesn't fan out duplicate transitions. +/// +/// +/// This class owns the de-dup + diff logic that lived in +/// GalaxyProxyDriver.OnHostConnectivityUpdate in v1. The watcher +/// () and the transport forwarder +/// () both feed this aggregator; the +/// consumes from +/// IHostConnectivityProbe.GetHostStatuses() and re-raises +/// as the driver-level event in a follow-up PR. +/// +public sealed class HostStatusAggregator +{ + private readonly object _lock = new(); + private readonly Dictionary _byHost = + new(StringComparer.OrdinalIgnoreCase); + + /// + /// Fires when an call either introduces a new host or + /// transitions an existing host's . Handlers run + /// outside the internal lock so they can safely re-enter the aggregator + /// (e.g. the driver re-broadcasting through IHostConnectivityProbe). + /// + public event EventHandler? OnHostStatusChanged; + + /// + /// Snapshot the current host set. Suitable as the body of + /// IHostConnectivityProbe.GetHostStatuses(). + /// + public IReadOnlyList Snapshot() + { + lock (_lock) + { + return [.. _byHost.Values]; + } + } + + /// + /// Upsert the supplied status by . + /// Raises when the host is newly tracked + /// (previous state reported as ) or when its + /// state value differs from the last cached entry. Re-asserting the same + /// state is silent. + /// + public void Update(HostConnectivityStatus status) + { + ArgumentNullException.ThrowIfNull(status); + + HostState previous; + bool changed; + lock (_lock) + { + if (_byHost.TryGetValue(status.HostName, out var existing)) + { + previous = existing.State; + changed = existing.State != status.State; + } + else + { + previous = HostState.Unknown; + changed = true; + } + + _byHost[status.HostName] = status; + } + + if (changed) + { + OnHostStatusChanged?.Invoke(this, + new HostStatusChangedEventArgs(status.HostName, previous, status.State)); + } + } + + /// + /// Drop a host entirely (e.g. after a redeploy removes a Platform). No event + /// is fired — observers only react to live transitions, not topology + /// reductions. Returns true when the host was tracked. + /// + public bool Remove(string hostName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(hostName); + lock (_lock) + { + return _byHost.Remove(hostName); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs new file mode 100644 index 0000000..b86a9cd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Health/PerPlatformProbeWatcher.cs @@ -0,0 +1,188 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; + +/// +/// Subscribes the ScanState attribute of every $WinPlatform / +/// $AppEngine object the discoverer surfaced and translates ScanState +/// value-changes into per-host updates. +/// Ports the state machine in +/// Driver.Galaxy.Host/Backend/Stability/GalaxyRuntimeProbeManager.cs onto the +/// gateway subscription path. +/// +/// +/// Address grammar: each platform tag's probe address is +/// {platformTagName}.ScanState. The watcher subscribes that address through +/// ; the EventPump (PR 4.4) routes inbound +/// OnDataChange events back via . State decoding: +/// +/// Quality < 192 (Good) → . +/// Value 1, true, or "Running" → . +/// Value 0, false, or "Stopped" → . +/// Anything else with Good quality → . +/// +/// is idempotent — call it after every +/// Discover / Rediscover. Newly-added platforms are subscribed; removed ones are +/// unsubscribed and dropped from the aggregator. +/// +public sealed class PerPlatformProbeWatcher : IDisposable +{ + public const string ProbeSuffix = ".ScanState"; + + private readonly IGalaxySubscriber _subscriber; + private readonly HostStatusAggregator _aggregator; + private readonly ILogger _logger; + + // Tracked platform → gw item handle. Item handle 0 means the gw rejected the subscribe; + // we keep the entry so SyncPlatformsAsync doesn't try to subscribe it again on every call. + private readonly ConcurrentDictionary _itemHandlesByPlatform = + new(StringComparer.OrdinalIgnoreCase); + private readonly Lock _syncLock = new(); + private bool _disposed; + + public PerPlatformProbeWatcher( + IGalaxySubscriber subscriber, HostStatusAggregator aggregator, ILogger? logger = null) + { + _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); + _aggregator = aggregator ?? throw new ArgumentNullException(nameof(aggregator)); + _logger = logger ?? NullLogger.Instance; + } + + /// Snapshot of platform tag names currently watched. + public IReadOnlyCollection WatchedPlatforms => [.. _itemHandlesByPlatform.Keys]; + + /// + /// Reconcile the watched platform set against . + /// Subscribes new entries, unsubscribes dropped ones. Calling with the same set is + /// a no-op. + /// + public async Task SyncPlatformsAsync( + IEnumerable platformTagNames, CancellationToken cancellationToken) + { + ObjectDisposedException.ThrowIf(_disposed, this); + ArgumentNullException.ThrowIfNull(platformTagNames); + + var desired = new HashSet(platformTagNames, StringComparer.OrdinalIgnoreCase); + + // Compute deltas under the lock so concurrent SyncPlatformsAsync calls don't + // race on the membership view. + List toAdd; + List<(string Platform, int ItemHandle)> toRemove; + lock (_syncLock) + { + toAdd = [.. desired.Where(p => !_itemHandlesByPlatform.ContainsKey(p))]; + toRemove = [.. _itemHandlesByPlatform + .Where(kvp => !desired.Contains(kvp.Key) && kvp.Value > 0) + .Select(kvp => (kvp.Key, kvp.Value))]; + + // Drop removed entries from the membership map up-front so a concurrent + // OnProbeValueChanged for them is silently ignored. The unsubscribe RPC + // runs outside the lock. + foreach (var (platform, _) in toRemove) + { + _itemHandlesByPlatform.TryRemove(platform, out _); + _aggregator.Remove(platform); + } + } + + if (toRemove.Count > 0) + { + try + { + await _subscriber.UnsubscribeBulkAsync( + [.. toRemove.Select(t => t.ItemHandle)], cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "PerPlatformProbeWatcher unsubscribe failed for {Count} probe(s); aggregator entries already cleared.", + toRemove.Count); + } + } + + if (toAdd.Count == 0) return; + + var probeAddresses = toAdd.Select(p => p + ProbeSuffix).ToArray(); + // bufferedUpdateInterval=0 — probe ScanState changes are rare enough that the gw's + // default cadence is fine; explicit polling rate goes through PR 6.3. + var results = await _subscriber.SubscribeBulkAsync( + probeAddresses, bufferedUpdateIntervalMs: 0, cancellationToken).ConfigureAwait(false); + + for (var i = 0; i < toAdd.Count; i++) + { + var platform = toAdd[i]; + var match = results.FirstOrDefault(r => string.Equals( + r.TagAddress, probeAddresses[i], StringComparison.OrdinalIgnoreCase)); + + var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0; + _itemHandlesByPlatform[platform] = itemHandle; + + if (itemHandle <= 0) + { + _logger.LogWarning( + "PerPlatformProbeWatcher subscribe failed for {Platform}: {Error}", + platform, match?.ErrorMessage ?? ""); + } + } + } + + /// + /// Route an OnDataChange for a probe address into the aggregator. The EventPump + /// (PR 4.4) calls this; tests can drive it directly to exercise the state machine + /// without spinning a real gw. Foreign references (anything not ending in + /// , or a probe for a platform we're not tracking) are + /// silently dropped. + /// + public void OnProbeValueChanged(string fullReference, object? value, byte qualityByte) + { + if (_disposed) return; + ArgumentNullException.ThrowIfNull(fullReference); + + if (!fullReference.EndsWith(ProbeSuffix, StringComparison.OrdinalIgnoreCase)) return; + var platform = fullReference[..^ProbeSuffix.Length]; + if (!_itemHandlesByPlatform.ContainsKey(platform)) return; + + var state = DecodeState(value, qualityByte); + _aggregator.Update(new HostConnectivityStatus(platform, state, DateTime.UtcNow)); + } + + /// + /// Decode a ScanState value + raw quality byte to a . + /// Public for tests that want to pin the decoding table. + /// + public static HostState DecodeState(object? value, byte qualityByte) + { + if (qualityByte < 192) return HostState.Unknown; + + return value switch + { + bool b => b ? HostState.Running : HostState.Stopped, + int i => i == 1 ? HostState.Running : i == 0 ? HostState.Stopped : HostState.Faulted, + short s => s == 1 ? HostState.Running : s == 0 ? HostState.Stopped : HostState.Faulted, + long l => l == 1 ? HostState.Running : l == 0 ? HostState.Stopped : HostState.Faulted, + string str when string.Equals(str, "Running", StringComparison.OrdinalIgnoreCase) => HostState.Running, + string str when string.Equals(str, "Stopped", StringComparison.OrdinalIgnoreCase) => HostState.Stopped, + _ => HostState.Faulted, + }; + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + // Best-effort unsubscribe everything we know about. Run synchronously through + // GetAwaiter().GetResult() since Dispose is sync; transport errors are swallowed. + var liveHandles = _itemHandlesByPlatform.Values.Where(h => h > 0).ToArray(); + _itemHandlesByPlatform.Clear(); + if (liveHandles.Length > 0) + { + try { _subscriber.UnsubscribeBulkAsync(liveHandles, CancellationToken.None).GetAwaiter().GetResult(); } + catch (Exception ex) { _logger.LogWarning(ex, "PerPlatformProbeWatcher dispose unsubscribe failed"); } + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostConnectivityForwarderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostConnectivityForwarderTests.cs new file mode 100644 index 0000000..8978fa5 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostConnectivityForwarderTests.cs @@ -0,0 +1,83 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health; + +/// +/// Tests for 's push path. The forwarder is a +/// thin shim over ; the only invariants worth pinning +/// are that SetTransport routes correctly under the configured client name and that +/// repeated identical pushes don't produce duplicate change events (the aggregator's +/// dedup carries that — this test asserts the forwarder doesn't re-introduce them). +/// +public sealed class HostConnectivityForwarderTests +{ + [Fact] + public void SetTransport_Running_PushesUnderClientName() + { + var agg = new HostStatusAggregator(); + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg); + fwd.SetTransport(HostState.Running); + + captured.Count.ShouldBe(1); + captured[0].HostName.ShouldBe("OtOpcUa-A"); + captured[0].NewState.ShouldBe(HostState.Running); + agg.Snapshot()[0].HostName.ShouldBe("OtOpcUa-A"); + } + + [Fact] + public void SetTransport_StateTransition_FiresChange() + { + var agg = new HostStatusAggregator(); + var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg); + + fwd.SetTransport(HostState.Running); + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + fwd.SetTransport(HostState.Stopped); + + captured.Count.ShouldBe(1); + captured[0].OldState.ShouldBe(HostState.Running); + captured[0].NewState.ShouldBe(HostState.Stopped); + } + + [Fact] + public void SetTransport_RepeatedSameState_DoesNotFire() + { + var agg = new HostStatusAggregator(); + var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg); + + fwd.SetTransport(HostState.Running); + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + fwd.SetTransport(HostState.Running); + fwd.SetTransport(HostState.Running); + fwd.SetTransport(HostState.Running); + + captured.ShouldBeEmpty(); + } + + [Fact] + public void Constructor_RejectsEmptyClientName() + { + var agg = new HostStatusAggregator(); + Should.Throw(() => new HostConnectivityForwarder("", agg)); + Should.Throw(() => new HostConnectivityForwarder(" ", agg)); + } + + [Fact] + public void SetTransport_AfterDispose_Throws() + { + var agg = new HostStatusAggregator(); + var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg); + fwd.Dispose(); + Should.Throw(() => fwd.SetTransport(HostState.Running)); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostStatusAggregatorTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostStatusAggregatorTests.cs new file mode 100644 index 0000000..ff67151 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/HostStatusAggregatorTests.cs @@ -0,0 +1,137 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health; + +/// +/// Tests for — the merge + diff logic for the +/// transport entry plus per-platform probe entries that +/// IHostConnectivityProbe.GetHostStatuses() surfaces. +/// +public sealed class HostStatusAggregatorTests +{ + private static HostConnectivityStatus Status(string host, HostState state) => + new(host, state, DateTime.UtcNow); + + [Fact] + public void Snapshot_Empty_WhenNothingTracked() + { + var agg = new HostStatusAggregator(); + agg.Snapshot().ShouldBeEmpty(); + } + + [Fact] + public void Update_NewHost_FiresChange_PreviousIsUnknown() + { + var agg = new HostStatusAggregator(); + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + agg.Update(Status("PlatformA", HostState.Running)); + + captured.Count.ShouldBe(1); + captured[0].HostName.ShouldBe("PlatformA"); + captured[0].OldState.ShouldBe(HostState.Unknown); + captured[0].NewState.ShouldBe(HostState.Running); + } + + [Fact] + public void Update_SameState_DoesNotFire() + { + var agg = new HostStatusAggregator(); + agg.Update(Status("PlatformA", HostState.Running)); + + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + agg.Update(Status("PlatformA", HostState.Running)); + + captured.ShouldBeEmpty(); + } + + [Fact] + public void Update_StateTransition_FiresChangeWithCorrectPreviousAndNew() + { + var agg = new HostStatusAggregator(); + agg.Update(Status("PlatformA", HostState.Running)); + + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + agg.Update(Status("PlatformA", HostState.Stopped)); + + captured.Count.ShouldBe(1); + captured[0].OldState.ShouldBe(HostState.Running); + captured[0].NewState.ShouldBe(HostState.Stopped); + } + + [Fact] + public void Snapshot_ReflectsEveryUpsertedHost() + { + var agg = new HostStatusAggregator(); + agg.Update(Status("Transport", HostState.Running)); + agg.Update(Status("PlatformA", HostState.Running)); + agg.Update(Status("PlatformB", HostState.Stopped)); + + var snap = agg.Snapshot(); + + snap.Count.ShouldBe(3); + snap.Select(s => s.HostName).OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB", "Transport" }); + snap.First(s => s.HostName == "PlatformB").State.ShouldBe(HostState.Stopped); + } + + [Fact] + public void Update_HostNameComparison_IsCaseInsensitive() + { + var agg = new HostStatusAggregator(); + var captured = new List(); + agg.OnHostStatusChanged += (_, e) => captured.Add(e); + + agg.Update(Status("PlatformA", HostState.Running)); + agg.Update(Status("platforma", HostState.Stopped)); // same host, different case + + captured.Count.ShouldBe(2); + captured[1].OldState.ShouldBe(HostState.Running); + captured[1].NewState.ShouldBe(HostState.Stopped); + agg.Snapshot().Count.ShouldBe(1); + } + + [Fact] + public void Remove_TrackedHost_ReturnsTrue_AndDropsFromSnapshot() + { + var agg = new HostStatusAggregator(); + agg.Update(Status("PlatformA", HostState.Running)); + agg.Remove("PlatformA").ShouldBeTrue(); + agg.Snapshot().ShouldBeEmpty(); + } + + [Fact] + public void Remove_UnknownHost_ReturnsFalse() + { + var agg = new HostStatusAggregator(); + agg.Remove("Nope").ShouldBeFalse(); + } + + [Fact] + public void ConcurrentUpdates_DoNotCorruptDictionary() + { + var agg = new HostStatusAggregator(); + const int threadCount = 8; + const int updatesPerThread = 250; + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < updatesPerThread; i++) + { + var hostName = $"Host{(t * updatesPerThread + i) % 32}"; + var state = i % 2 == 0 ? HostState.Running : HostState.Stopped; + agg.Update(Status(hostName, state)); + } + })).ToArray(); + + Task.WaitAll(tasks); + agg.Snapshot().Count.ShouldBeLessThanOrEqualTo(32); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs new file mode 100644 index 0000000..cd7dd9e --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Health/PerPlatformProbeWatcherTests.cs @@ -0,0 +1,191 @@ +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health; + +/// +/// Tests for — the per-platform probe state +/// machine. Uses a fake to control SubscribeBulk +/// results and assert the watcher subscribes the right addresses + decodes ScanState +/// values correctly. +/// +public sealed class PerPlatformProbeWatcherTests +{ + private sealed class FakeSubscriber : IGalaxySubscriber + { + public List> Subscribes { get; } = []; + public List> Unsubscribes { get; } = []; + private int _nextHandle = 1; + public Dictionary HandleByAddress { get; } = new(StringComparer.OrdinalIgnoreCase); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + { + Subscribes.Add([.. fullReferences]); + var results = new List(fullReferences.Count); + foreach (var addr in fullReferences) + { + var handle = Interlocked.Increment(ref _nextHandle); + HandleByAddress[addr] = handle; + results.Add(new SubscribeResult + { + TagAddress = addr, + ItemHandle = handle, + WasSuccessful = true, + }); + } + return Task.FromResult>(results); + } + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + { + Unsubscribes.Add([.. itemHandles]); + return Task.CompletedTask; + } + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => Empty(); + + private static async IAsyncEnumerable Empty() + { + await Task.CompletedTask; + yield break; + } + } + + [Fact] + public async Task SyncPlatformsAsync_SubscribesScanStateAddressForEachPlatform() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["PlatformA", "PlatformB"], CancellationToken.None); + + subscriber.Subscribes.Count.ShouldBe(1); + subscriber.Subscribes[0].ShouldBe(new[] { "PlatformA.ScanState", "PlatformB.ScanState" }); + watcher.WatchedPlatforms.OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB" }); + } + + [Fact] + public async Task SyncPlatformsAsync_SameSetTwice_DoesNotResubscribe() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + + subscriber.Subscribes.Count.ShouldBe(1); + } + + [Fact] + public async Task SyncPlatformsAsync_RemovedPlatforms_AreUnsubscribed_AndDroppedFromAggregator() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["A", "B"], CancellationToken.None); + var bHandle = subscriber.HandleByAddress["B.ScanState"]; + + // Push a value so B is in the aggregator before we remove it. + watcher.OnProbeValueChanged("B.ScanState", true, qualityByte: 192); + aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeTrue(); + + await watcher.SyncPlatformsAsync(["A"], CancellationToken.None); + + subscriber.Unsubscribes.Count.ShouldBe(1); + subscriber.Unsubscribes[0].ShouldBe(new[] { bHandle }); + watcher.WatchedPlatforms.ShouldBe(new[] { "A" }); + aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeFalse(); + } + + [Theory] + [InlineData(true, (byte)192, HostState.Running)] + [InlineData(false, (byte)192, HostState.Stopped)] + [InlineData(1, (byte)192, HostState.Running)] + [InlineData(0, (byte)192, HostState.Stopped)] + [InlineData("Running", (byte)192, HostState.Running)] + [InlineData("Stopped", (byte)192, HostState.Stopped)] + [InlineData("running", (byte)192, HostState.Running)] + [InlineData(2, (byte)192, HostState.Faulted)] // unknown int + [InlineData("Whatever", (byte)192, HostState.Faulted)] // unknown string + [InlineData(true, (byte)64, HostState.Unknown)] // bad quality wins + [InlineData(true, (byte)0, HostState.Unknown)] + public void DecodeState_TablePins(object? value, byte qualityByte, HostState expected) + { + PerPlatformProbeWatcher.DecodeState(value, qualityByte).ShouldBe(expected); + } + + [Fact] + public async Task OnProbeValueChanged_Running_RoutesToAggregator() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 192); + + var snap = aggregator.Snapshot().Single(s => s.HostName == "PlatformA"); + snap.State.ShouldBe(HostState.Running); + } + + [Fact] + public async Task OnProbeValueChanged_BadQuality_RoutesUnknown() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 0); + + aggregator.Snapshot().Single(s => s.HostName == "PlatformA").State.ShouldBe(HostState.Unknown); + } + + [Fact] + public async Task OnProbeValueChanged_ForeignReference_IsSilentlyDropped() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); + + // Reference doesn't end with .ScanState — silently dropped. + watcher.OnProbeValueChanged("PlatformA.SomethingElse", true, qualityByte: 192); + aggregator.Snapshot().Any(s => s.HostName == "PlatformA").ShouldBeFalse(); + + // Unknown platform — silently dropped. + watcher.OnProbeValueChanged("Stranger.ScanState", true, qualityByte: 192); + aggregator.Snapshot().Any(s => s.HostName == "Stranger").ShouldBeFalse(); + } + + [Fact] + public async Task Dispose_UnsubscribesAllTrackedPlatforms() + { + var subscriber = new FakeSubscriber(); + var aggregator = new HostStatusAggregator(); + var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); + + await watcher.SyncPlatformsAsync(["A", "B", "C"], CancellationToken.None); + var expectedHandles = new[] + { + subscriber.HandleByAddress["A.ScanState"], + subscriber.HandleByAddress["B.ScanState"], + subscriber.HandleByAddress["C.ScanState"], + }; + + watcher.Dispose(); + + subscriber.Unsubscribes.Count.ShouldBe(1); + subscriber.Unsubscribes[0].OrderBy(x => x).ShouldBe(expectedHandles.OrderBy(x => x)); + } +}