using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health; /// /// Tests for — the per-platform probe state /// machine. Uses a fake to control SubscribeBulk /// results and assert the watcher subscribes the right addresses + decodes ScanState /// values correctly. /// public sealed class PerPlatformProbeWatcherTests { private sealed class FakeSubscriber : IGalaxySubscriber { public List> Subscribes { get; } = []; public List SubscribeIntervalsMs { get; } = []; public List> Unsubscribes { get; } = []; private int _nextHandle = 1; public Dictionary HandleByAddress { get; } = new(StringComparer.OrdinalIgnoreCase); public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) { Subscribes.Add([.. fullReferences]); SubscribeIntervalsMs.Add(bufferedUpdateIntervalMs); var results = new List(fullReferences.Count); foreach (var addr in fullReferences) { var handle = Interlocked.Increment(ref _nextHandle); HandleByAddress[addr] = handle; results.Add(new SubscribeResult { TagAddress = addr, ItemHandle = handle, WasSuccessful = true, }); } return Task.FromResult>(results); } public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) { Unsubscribes.Add([.. itemHandles]); return Task.CompletedTask; } public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => Empty(); private static async IAsyncEnumerable Empty() { await Task.CompletedTask; yield break; } } [Fact] public async Task SyncPlatformsAsync_SubscribesScanStateAddressForEachPlatform() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["PlatformA", "PlatformB"], CancellationToken.None); subscriber.Subscribes.Count.ShouldBe(1); subscriber.Subscribes[0].ShouldBe(new[] { "PlatformA.ScanState", "PlatformB.ScanState" }); watcher.WatchedPlatforms.OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB" }); } [Fact] public async Task SyncPlatformsAsync_DefaultBufferedIntervalIsZero_GwCadence() { // PR 6.3 — without an override, the watcher passes 0 (gw default cadence) so // existing deployments don't see a behavior change. var subscriber = new FakeSubscriber(); using var watcher = new PerPlatformProbeWatcher(subscriber, new HostStatusAggregator()); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); subscriber.SubscribeIntervalsMs.ShouldHaveSingleItem().ShouldBe(0); } [Fact] public async Task SyncPlatformsAsync_ConfiguredBufferedInterval_IsForwardedToGw() { // PR 6.3 — when a deployment dials down MxAccess.PublishingIntervalMs for // tighter health visibility, the probe watcher must forward that interval // through SubscribeBulk so the gw publishes ScanState changes at the // configured cadence. var subscriber = new FakeSubscriber(); using var watcher = new PerPlatformProbeWatcher( subscriber, new HostStatusAggregator(), bufferedUpdateIntervalMs: 250); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); subscriber.SubscribeIntervalsMs.ShouldHaveSingleItem().ShouldBe(250); } [Fact] public void Constructor_RejectsNegativeBufferedInterval() { var subscriber = new FakeSubscriber(); Should.Throw(() => new PerPlatformProbeWatcher(subscriber, new HostStatusAggregator(), bufferedUpdateIntervalMs: -1)); } [Fact] public async Task SyncPlatformsAsync_SameSetTwice_DoesNotResubscribe() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); subscriber.Subscribes.Count.ShouldBe(1); } [Fact] public async Task SyncPlatformsAsync_RemovedPlatforms_AreUnsubscribed_AndDroppedFromAggregator() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["A", "B"], CancellationToken.None); var bHandle = subscriber.HandleByAddress["B.ScanState"]; // Push a value so B is in the aggregator before we remove it. watcher.OnProbeValueChanged("B.ScanState", true, qualityByte: 192); aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeTrue(); await watcher.SyncPlatformsAsync(["A"], CancellationToken.None); subscriber.Unsubscribes.Count.ShouldBe(1); subscriber.Unsubscribes[0].ShouldBe(new[] { bHandle }); watcher.WatchedPlatforms.ShouldBe(new[] { "A" }); aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeFalse(); } [Theory] [InlineData(true, (byte)192, HostState.Running)] [InlineData(false, (byte)192, HostState.Stopped)] [InlineData(1, (byte)192, HostState.Running)] [InlineData(0, (byte)192, HostState.Stopped)] [InlineData("Running", (byte)192, HostState.Running)] [InlineData("Stopped", (byte)192, HostState.Stopped)] [InlineData("running", (byte)192, HostState.Running)] [InlineData(2, (byte)192, HostState.Faulted)] // unknown int [InlineData("Whatever", (byte)192, HostState.Faulted)] // unknown string [InlineData(true, (byte)64, HostState.Unknown)] // bad quality wins [InlineData(true, (byte)0, HostState.Unknown)] public void DecodeState_TablePins(object? value, byte qualityByte, HostState expected) { PerPlatformProbeWatcher.DecodeState(value, qualityByte).ShouldBe(expected); } [Fact] public async Task OnProbeValueChanged_Running_RoutesToAggregator() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 192); var snap = aggregator.Snapshot().Single(s => s.HostName == "PlatformA"); snap.State.ShouldBe(HostState.Running); } [Fact] public async Task OnProbeValueChanged_BadQuality_RoutesUnknown() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 0); aggregator.Snapshot().Single(s => s.HostName == "PlatformA").State.ShouldBe(HostState.Unknown); } [Fact] public async Task OnProbeValueChanged_ForeignReference_IsSilentlyDropped() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None); // Reference doesn't end with .ScanState — silently dropped. watcher.OnProbeValueChanged("PlatformA.SomethingElse", true, qualityByte: 192); aggregator.Snapshot().Any(s => s.HostName == "PlatformA").ShouldBeFalse(); // Unknown platform — silently dropped. watcher.OnProbeValueChanged("Stranger.ScanState", true, qualityByte: 192); aggregator.Snapshot().Any(s => s.HostName == "Stranger").ShouldBeFalse(); } [Fact] public async Task Dispose_UnsubscribesAllTrackedPlatforms() { var subscriber = new FakeSubscriber(); var aggregator = new HostStatusAggregator(); var watcher = new PerPlatformProbeWatcher(subscriber, aggregator); await watcher.SyncPlatformsAsync(["A", "B", "C"], CancellationToken.None); var expectedHandles = new[] { subscriber.HandleByAddress["A.ScanState"], subscriber.HandleByAddress["B.ScanState"], subscriber.HandleByAddress["C.ScanState"], }; watcher.Dispose(); subscriber.Unsubscribes.Count.ShouldBe(1); subscriber.Unsubscribes[0].OrderBy(x => x).ShouldBe(expectedHandles.OrderBy(x => x)); } }