PR 4.7 — Host-connectivity probes (IHostConnectivityProbe scaffold)

HostStatusAggregator merges transport + per-platform host entries with
change-event diffing (re-asserting same state is a no-op so a stable
ScanState=Running burst doesn't fan out duplicates). PerPlatformProbeWatcher
ports the legacy GalaxyRuntimeProbeManager state machine onto the gw
subscription path: SubscribeBulk for `<tag>.ScanState`, idempotent
SyncPlatformsAsync (subscribe new, unsubscribe dropped), and a
DecodeState helper pinning bool/int/string ScanState values + bad-quality
fallback. HostConnectivityForwarder is the skeleton for the gw-6
StreamSessionHealth signal — until that mxaccessgw RPC ships, PR 4.5's
ReconnectSupervisor pushes transport state by calling SetTransport on
session connect/disconnect.

GalaxyDriver wiring (implement IHostConnectivityProbe, route OnDataChange
to PerPlatformProbeWatcher, expose GetHostStatuses() / OnHostStatusChanged,
push transport from supervisor) is deferred to PR 4.W to avoid conflict
with the rest of the Phase 4 deferred wiring (4.5 supervisor + 4.6
DeployWatcher).

Tests: 19 new
- HostStatusAggregatorTests (9): empty snapshot, new-host change with
  Unknown predecessor, same-state silence, transition diff, snapshot
  reflects every host, case-insensitive host names, Remove returns true
  for tracked, Remove false for unknown, concurrent updates don't corrupt.
- HostConnectivityForwarderTests (5): SetTransport routes under client
  name, transitions fire change, repeated same-state silent, empty client
  name throws, post-dispose throws.
- PerPlatformProbeWatcherTests (5 + theory pinning DecodeState's full
  truth table): subscribe N platforms, idempotent re-sync, removed
  platforms unsubscribed + dropped from aggregator, OnProbeValueChanged
  routing for Running/Stopped/bad-quality/foreign-ref, Dispose
  unsubscribes everything.

NOTE: build is currently broken because mxaccessgw/clients/dotnet/ has
been removed from C:\Users\dohertj2\Desktop\mxaccessgw — this PR's source
is internally consistent and isolated from the missing dependency, but the
existing Driver.Galaxy code (PRs 4.1–4.6) can't compile until the .NET
client is restored. Once it is, expect 116 + 19 = 135 tests in the
Driver.Galaxy.Tests project.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-29 15:47:13 -04:00
parent 123e3e48b9
commit dae520b9c0
6 changed files with 755 additions and 0 deletions

View File

@@ -0,0 +1,83 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health;
/// <summary>
/// Tests for <see cref="HostConnectivityForwarder"/>'s push path. The forwarder is a
/// thin shim over <see cref="HostStatusAggregator"/>; the only invariants worth pinning
/// are that SetTransport routes correctly under the configured client name and that
/// repeated identical pushes don't produce duplicate change events (the aggregator's
/// dedup carries that — this test asserts the forwarder doesn't re-introduce them).
/// </summary>
public sealed class HostConnectivityForwarderTests
{
[Fact]
public void SetTransport_Running_PushesUnderClientName()
{
var agg = new HostStatusAggregator();
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg);
fwd.SetTransport(HostState.Running);
captured.Count.ShouldBe(1);
captured[0].HostName.ShouldBe("OtOpcUa-A");
captured[0].NewState.ShouldBe(HostState.Running);
agg.Snapshot()[0].HostName.ShouldBe("OtOpcUa-A");
}
[Fact]
public void SetTransport_StateTransition_FiresChange()
{
var agg = new HostStatusAggregator();
var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg);
fwd.SetTransport(HostState.Running);
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
fwd.SetTransport(HostState.Stopped);
captured.Count.ShouldBe(1);
captured[0].OldState.ShouldBe(HostState.Running);
captured[0].NewState.ShouldBe(HostState.Stopped);
}
[Fact]
public void SetTransport_RepeatedSameState_DoesNotFire()
{
var agg = new HostStatusAggregator();
var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg);
fwd.SetTransport(HostState.Running);
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
fwd.SetTransport(HostState.Running);
fwd.SetTransport(HostState.Running);
fwd.SetTransport(HostState.Running);
captured.ShouldBeEmpty();
}
[Fact]
public void Constructor_RejectsEmptyClientName()
{
var agg = new HostStatusAggregator();
Should.Throw<ArgumentException>(() => new HostConnectivityForwarder("", agg));
Should.Throw<ArgumentException>(() => new HostConnectivityForwarder(" ", agg));
}
[Fact]
public void SetTransport_AfterDispose_Throws()
{
var agg = new HostStatusAggregator();
var fwd = new HostConnectivityForwarder("OtOpcUa-A", agg);
fwd.Dispose();
Should.Throw<ObjectDisposedException>(() => fwd.SetTransport(HostState.Running));
}
}

View File

@@ -0,0 +1,137 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health;
/// <summary>
/// Tests for <see cref="HostStatusAggregator"/> — the merge + diff logic for the
/// transport entry plus per-platform probe entries that
/// <c>IHostConnectivityProbe.GetHostStatuses()</c> surfaces.
/// </summary>
public sealed class HostStatusAggregatorTests
{
private static HostConnectivityStatus Status(string host, HostState state) =>
new(host, state, DateTime.UtcNow);
[Fact]
public void Snapshot_Empty_WhenNothingTracked()
{
var agg = new HostStatusAggregator();
agg.Snapshot().ShouldBeEmpty();
}
[Fact]
public void Update_NewHost_FiresChange_PreviousIsUnknown()
{
var agg = new HostStatusAggregator();
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
agg.Update(Status("PlatformA", HostState.Running));
captured.Count.ShouldBe(1);
captured[0].HostName.ShouldBe("PlatformA");
captured[0].OldState.ShouldBe(HostState.Unknown);
captured[0].NewState.ShouldBe(HostState.Running);
}
[Fact]
public void Update_SameState_DoesNotFire()
{
var agg = new HostStatusAggregator();
agg.Update(Status("PlatformA", HostState.Running));
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
agg.Update(Status("PlatformA", HostState.Running));
captured.ShouldBeEmpty();
}
[Fact]
public void Update_StateTransition_FiresChangeWithCorrectPreviousAndNew()
{
var agg = new HostStatusAggregator();
agg.Update(Status("PlatformA", HostState.Running));
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
agg.Update(Status("PlatformA", HostState.Stopped));
captured.Count.ShouldBe(1);
captured[0].OldState.ShouldBe(HostState.Running);
captured[0].NewState.ShouldBe(HostState.Stopped);
}
[Fact]
public void Snapshot_ReflectsEveryUpsertedHost()
{
var agg = new HostStatusAggregator();
agg.Update(Status("Transport", HostState.Running));
agg.Update(Status("PlatformA", HostState.Running));
agg.Update(Status("PlatformB", HostState.Stopped));
var snap = agg.Snapshot();
snap.Count.ShouldBe(3);
snap.Select(s => s.HostName).OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB", "Transport" });
snap.First(s => s.HostName == "PlatformB").State.ShouldBe(HostState.Stopped);
}
[Fact]
public void Update_HostNameComparison_IsCaseInsensitive()
{
var agg = new HostStatusAggregator();
var captured = new List<HostStatusChangedEventArgs>();
agg.OnHostStatusChanged += (_, e) => captured.Add(e);
agg.Update(Status("PlatformA", HostState.Running));
agg.Update(Status("platforma", HostState.Stopped)); // same host, different case
captured.Count.ShouldBe(2);
captured[1].OldState.ShouldBe(HostState.Running);
captured[1].NewState.ShouldBe(HostState.Stopped);
agg.Snapshot().Count.ShouldBe(1);
}
[Fact]
public void Remove_TrackedHost_ReturnsTrue_AndDropsFromSnapshot()
{
var agg = new HostStatusAggregator();
agg.Update(Status("PlatformA", HostState.Running));
agg.Remove("PlatformA").ShouldBeTrue();
agg.Snapshot().ShouldBeEmpty();
}
[Fact]
public void Remove_UnknownHost_ReturnsFalse()
{
var agg = new HostStatusAggregator();
agg.Remove("Nope").ShouldBeFalse();
}
[Fact]
public void ConcurrentUpdates_DoNotCorruptDictionary()
{
var agg = new HostStatusAggregator();
const int threadCount = 8;
const int updatesPerThread = 250;
var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() =>
{
for (var i = 0; i < updatesPerThread; i++)
{
var hostName = $"Host{(t * updatesPerThread + i) % 32}";
var state = i % 2 == 0 ? HostState.Running : HostState.Stopped;
agg.Update(Status(hostName, state));
}
})).ToArray();
Task.WaitAll(tasks);
agg.Snapshot().Count.ShouldBeLessThanOrEqualTo(32);
}
}

View File

@@ -0,0 +1,191 @@
using MxGateway.Contracts.Proto;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Health;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Health;
/// <summary>
/// Tests for <see cref="PerPlatformProbeWatcher"/> — the per-platform probe state
/// machine. Uses a fake <see cref="IGalaxySubscriber"/> to control SubscribeBulk
/// results and assert the watcher subscribes the right addresses + decodes ScanState
/// values correctly.
/// </summary>
public sealed class PerPlatformProbeWatcherTests
{
private sealed class FakeSubscriber : IGalaxySubscriber
{
public List<List<string>> Subscribes { get; } = [];
public List<List<int>> Unsubscribes { get; } = [];
private int _nextHandle = 1;
public Dictionary<string, int> HandleByAddress { get; } = new(StringComparer.OrdinalIgnoreCase);
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
{
Subscribes.Add([.. fullReferences]);
var results = new List<SubscribeResult>(fullReferences.Count);
foreach (var addr in fullReferences)
{
var handle = Interlocked.Increment(ref _nextHandle);
HandleByAddress[addr] = handle;
results.Add(new SubscribeResult
{
TagAddress = addr,
ItemHandle = handle,
WasSuccessful = true,
});
}
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
}
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
{
Unsubscribes.Add([.. itemHandles]);
return Task.CompletedTask;
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
=> Empty();
private static async IAsyncEnumerable<MxEvent> Empty()
{
await Task.CompletedTask;
yield break;
}
}
[Fact]
public async Task SyncPlatformsAsync_SubscribesScanStateAddressForEachPlatform()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["PlatformA", "PlatformB"], CancellationToken.None);
subscriber.Subscribes.Count.ShouldBe(1);
subscriber.Subscribes[0].ShouldBe(new[] { "PlatformA.ScanState", "PlatformB.ScanState" });
watcher.WatchedPlatforms.OrderBy(x => x).ShouldBe(new[] { "PlatformA", "PlatformB" });
}
[Fact]
public async Task SyncPlatformsAsync_SameSetTwice_DoesNotResubscribe()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None);
await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None);
subscriber.Subscribes.Count.ShouldBe(1);
}
[Fact]
public async Task SyncPlatformsAsync_RemovedPlatforms_AreUnsubscribed_AndDroppedFromAggregator()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["A", "B"], CancellationToken.None);
var bHandle = subscriber.HandleByAddress["B.ScanState"];
// Push a value so B is in the aggregator before we remove it.
watcher.OnProbeValueChanged("B.ScanState", true, qualityByte: 192);
aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeTrue();
await watcher.SyncPlatformsAsync(["A"], CancellationToken.None);
subscriber.Unsubscribes.Count.ShouldBe(1);
subscriber.Unsubscribes[0].ShouldBe(new[] { bHandle });
watcher.WatchedPlatforms.ShouldBe(new[] { "A" });
aggregator.Snapshot().Any(s => s.HostName == "B").ShouldBeFalse();
}
[Theory]
[InlineData(true, (byte)192, HostState.Running)]
[InlineData(false, (byte)192, HostState.Stopped)]
[InlineData(1, (byte)192, HostState.Running)]
[InlineData(0, (byte)192, HostState.Stopped)]
[InlineData("Running", (byte)192, HostState.Running)]
[InlineData("Stopped", (byte)192, HostState.Stopped)]
[InlineData("running", (byte)192, HostState.Running)]
[InlineData(2, (byte)192, HostState.Faulted)] // unknown int
[InlineData("Whatever", (byte)192, HostState.Faulted)] // unknown string
[InlineData(true, (byte)64, HostState.Unknown)] // bad quality wins
[InlineData(true, (byte)0, HostState.Unknown)]
public void DecodeState_TablePins(object? value, byte qualityByte, HostState expected)
{
PerPlatformProbeWatcher.DecodeState(value, qualityByte).ShouldBe(expected);
}
[Fact]
public async Task OnProbeValueChanged_Running_RoutesToAggregator()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None);
watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 192);
var snap = aggregator.Snapshot().Single(s => s.HostName == "PlatformA");
snap.State.ShouldBe(HostState.Running);
}
[Fact]
public async Task OnProbeValueChanged_BadQuality_RoutesUnknown()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None);
watcher.OnProbeValueChanged("PlatformA.ScanState", true, qualityByte: 0);
aggregator.Snapshot().Single(s => s.HostName == "PlatformA").State.ShouldBe(HostState.Unknown);
}
[Fact]
public async Task OnProbeValueChanged_ForeignReference_IsSilentlyDropped()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
using var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["PlatformA"], CancellationToken.None);
// Reference doesn't end with .ScanState — silently dropped.
watcher.OnProbeValueChanged("PlatformA.SomethingElse", true, qualityByte: 192);
aggregator.Snapshot().Any(s => s.HostName == "PlatformA").ShouldBeFalse();
// Unknown platform — silently dropped.
watcher.OnProbeValueChanged("Stranger.ScanState", true, qualityByte: 192);
aggregator.Snapshot().Any(s => s.HostName == "Stranger").ShouldBeFalse();
}
[Fact]
public async Task Dispose_UnsubscribesAllTrackedPlatforms()
{
var subscriber = new FakeSubscriber();
var aggregator = new HostStatusAggregator();
var watcher = new PerPlatformProbeWatcher(subscriber, aggregator);
await watcher.SyncPlatformsAsync(["A", "B", "C"], CancellationToken.None);
var expectedHandles = new[]
{
subscriber.HandleByAddress["A.ScanState"],
subscriber.HandleByAddress["B.ScanState"],
subscriber.HandleByAddress["C.ScanState"],
};
watcher.Dispose();
subscriber.Unsubscribes.Count.ShouldBe(1);
subscriber.Unsubscribes[0].OrderBy(x => x).ShouldBe(expectedHandles.OrderBy(x => x));
}
}