From 268b12edec8d56676d1bc9d55db93f96a1813117 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 12:12:00 -0400 Subject: [PATCH] =?UTF-8?q?Phase=203=20PR=2023=20=E2=80=94=20Modbus=20IHos?= =?UTF-8?q?tConnectivityProbe.=20ModbusDriver=20now=20implements=206=20of?= =?UTF-8?q?=208=20capability=20interfaces=20(adds=20IHostConnectivityProbe?= =?UTF-8?q?=20alongside=20IDriver=20+=20ITagDiscovery=20+=20IReadable=20+?= =?UTF-8?q?=20IWritable=20+=20ISubscribable=20from=20the=20earlier=20PRs).?= =?UTF-8?q?=20Background=20probe=20loop=20kicks=20off=20in=20InitializeAsy?= =?UTF-8?q?nc=20when=20ModbusProbeOptions.Enabled=20is=20true,=20sends=20a?= =?UTF-8?q?=20cheap=20FC03=20read-1-register=20at=20ProbeAddress=20(defaul?= =?UTF-8?q?t=200)=20every=20Interval=20(default=205s)=20with=20a=20per-tic?= =?UTF-8?q?k=20Timeout=20(default=202s),=20and=20tracks=20the=20single=20M?= =?UTF-8?q?odbus=20endpoint's=20state=20in=20the=20HostState=20machine.=20?= =?UTF-8?q?Initial=20state=20=3D=20Unknown;=20first=20successful=20probe?= =?UTF-8?q?=20transitions=20to=20Running;=20any=20transport/timeout=20fail?= =?UTF-8?q?ure=20transitions=20to=20Stopped;=20recovery=20transitions=20ba?= =?UTF-8?q?ck=20to=20Running.=20OnHostStatusChanged=20fires=20exactly=20on?= =?UTF-8?q?=20transitions=20(not=20on=20repeat=20successes=20=E2=80=94=20p?= =?UTF-8?q?revents=20event-spam=20on=20a=20healthy=20connection).=20HostNa?= =?UTF-8?q?me=20format=20is=20'host:port'=20so=20the=20Admin=20UI=20can=20?= =?UTF-8?q?display=20the=20endpoint=20uniformly=20with=20Galaxy=20platform?= =?UTF-8?q?s/engines=20in=20the=20fleet=20status=20dashboard.=20GetHostSta?= =?UTF-8?q?tuses=20returns=20a=20single-item=20list=20with=20the=20current?= =?UTF-8?q?=20state=20+=20last-change=20timestamp=20(Modbus=20driver=20tal?= =?UTF-8?q?ks=20to=20exactly=20one=20endpoint=20per=20instance=20=E2=80=94?= =?UTF-8?q?=20operators=20spin=20up=20multiple=20driver=20instances=20for?= =?UTF-8?q?=20multi-PLC=20deployments).=20ShutdownAsync=20cancels=20the=20?= =?UTF-8?q?probe=20CTS=20before=20tearing=20down=20the=20transport=20so=20?= =?UTF-8?q?the=20loop=20can't=20log=20a=20spurious=20Stopped=20after=20int?= =?UTF-8?q?entional=20shutdown=20(OperationCanceledException=20caught=20se?= =?UTF-8?q?parately=20from=20the=20'real'=20transport=20errors).=20ModbusD?= =?UTF-8?q?riverOptions=20extended=20with=20ModbusProbeOptions=20sub-recor?= =?UTF-8?q?d=20(Enabled=20default=20true,=20Interval=205s,=20Timeout=202s,?= =?UTF-8?q?=20ProbeAddress=20ushort=20for=20PLCs=20that=20have=20register-?= =?UTF-8?q?0=20policies;=20most=20PLCs=20tolerate=20an=20FC03=20at=200=20b?= =?UTF-8?q?ut=20some=20industrial=20gateways=20lock=20it).=20Tests=20(7=20?= =?UTF-8?q?new=20ModbusProbeTests):=20Initial=5Fstate=5Fis=5FUnknown=5Fbef?= =?UTF-8?q?ore=5Ffirst=5Fprobe=5Ftick=20(probe=20disabled,=20state=20stays?= =?UTF-8?q?=20Unknown,=20HostName=20formatted);=20First=5Fsuccessful=5Fpro?= =?UTF-8?q?be=5Ftransitions=5Fto=5FRunning=20(enabled,=20waits=20for=20pro?= =?UTF-8?q?be=20count=20+=20event=20queue,=20asserts=20Unknown=20=E2=86=92?= =?UTF-8?q?=20Running=20with=20correct=20OldState/NewState);=20Transport?= =?UTF-8?q?=5Ffailure=5Ftransitions=5Fto=5FStopped=20(flip=20fake.Reachabl?= =?UTF-8?q?e=20=3D=20false=20mid-run,=20wait=20for=20state=20diff);=20Reco?= =?UTF-8?q?very=5Ftransitions=5FStopped=5Fback=5Fto=5FRunning=20(up=20?= =?UTF-8?q?=E2=86=92=20down=20=E2=86=92=20up,=20asserts=20=E2=89=A5=203=20?= =?UTF-8?q?transitions);=20Repeated=5Fsuccessful=5Fprobes=5Fdo=5Fnot=5Fgen?= =?UTF-8?q?erate=5Fduplicate=5FRunning=5Fevents=20(several=20hundred=20ms?= =?UTF-8?q?=20of=20stable=20probes,=20count=20stays=20at=201);=20Disabled?= =?UTF-8?q?=5Fprobe=5Fstays=5FUnknown=5Fand=5Ffires=5Fno=5Fevents=20(safet?= =?UTF-8?q?y=20guard=20when=20operator=20wants=20to=20disable=20probing);?= =?UTF-8?q?=20Shutdown=5Fstops=5Fthe=5Fprobe=5Floop=20(probe=20count=20cap?= =?UTF-8?q?tured=20at=20shutdown,=20delay=20400ms,=20assert=20=E2=89=A4=20?= =?UTF-8?q?1=20extra=20to=20tolerate=20the=20narrow=20race=20where=20an=20?= =?UTF-8?q?in-flight=20tick=20completes=20after=20shutdown=20=E2=80=94=20t?= =?UTF-8?q?he=20contract=20is=20'no=20new=20ticks=20scheduled'=20not=20'in?= =?UTF-8?q?stantaneous=20freeze').=20FlappyTransport=20fake=20exposes=20a?= =?UTF-8?q?=20volatile=20Reachable=20flag=20so=20tests=20can=20toggle=20th?= =?UTF-8?q?e=20PLC=20availability=20mid-run,=20+=20ProbeCount=20counter=20?= =?UTF-8?q?so=20tests=20can=20assert=20the=20loop=20actually=20issued=20re?= =?UTF-8?q?quests.=20WaitForStateAsync=20helper=20polls=20GetHostStatuses?= =?UTF-8?q?=20up=20to=20a=20deadline;=20tolerates=20scheduler=20jitter=20o?= =?UTF-8?q?n=20slow=20CI=20runners.=20Full=20solution:=200=20errors,=20202?= =?UTF-8?q?=20unit=20+=20integration=20tests=20pass=20(22=20Modbus=20+=201?= =?UTF-8?q?80=20pre-existing).?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ModbusDriver.cs | 84 ++++++- .../ModbusDriverOptions.cs | 20 ++ .../ModbusProbeTests.cs | 208 ++++++++++++++++++ 3 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 857ffff..1ec266f 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -17,7 +17,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, Func? transportFactory = null) - : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { // Active polling subscriptions. Each subscription owns a background Task that polls the // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange @@ -26,6 +26,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta private long _nextSubscriptionId; public event EventHandler? OnDataChange; + public event EventHandler? OnHostStatusChanged; + + // Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts" + // collection has at most one entry. HostName is the Host:Port string so the Admin UI can + // display the PLC endpoint uniformly with Galaxy platforms/engines. + private readonly object _probeLock = new(); + private HostState _hostState = HostState.Unknown; + private DateTime _hostStateChangedUtc = DateTime.UtcNow; + private CancellationTokenSource? _probeCts; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); @@ -46,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false); foreach (var t in _options.Tags) _tagsByName[t.Name] = t; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + + // PR 23: kick off the probe loop once the transport is up. Initial state stays + // Unknown until the first probe tick succeeds — avoids broadcasting a premature + // Running transition before any register round-trip has happened. + if (_options.Probe.Enabled) + { + _probeCts = new CancellationTokenSource(); + _ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token); + } } catch (Exception ex) { @@ -62,6 +80,10 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public async Task ShutdownAsync(CancellationToken cancellationToken) { + try { _probeCts?.Cancel(); } catch { } + _probeCts?.Dispose(); + _probeCts = null; + foreach (var state in _subscriptions.Values) { try { state.Cts.Cancel(); } catch { } @@ -313,6 +335,66 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public string DiagnosticId => $"modbus-sub-{Id}"; } + // ---- IHostConnectivityProbe ---- + + public IReadOnlyList GetHostStatuses() + { + lock (_probeLock) + return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; + } + + /// + /// Host identifier surfaced to IHostConnectivityProbe.GetHostStatuses and the Admin UI. + /// Formatted as host:port so multiple Modbus drivers in the same server disambiguate + /// by endpoint without needing the driver-instance-id in the Admin dashboard. + /// + public string HostName => $"{_options.Host}:{_options.Port}"; + + private async Task ProbeLoopAsync(CancellationToken ct) + { + var transport = _transport; // captured reference; disposal tears the loop down via ct + while (!ct.IsCancellationRequested) + { + var success = false; + try + { + using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + probeCts.CancelAfter(_options.Probe.Timeout); + var pdu = new byte[] { 0x03, + (byte)(_options.Probe.ProbeAddress >> 8), + (byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 }; + _ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false); + success = true; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + catch + { + // transport / timeout / exception PDU — treated as Stopped below + } + + TransitionTo(success ? HostState.Running : HostState.Stopped); + + try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + } + } + + private void TransitionTo(HostState newState) + { + HostState old; + lock (_probeLock) + { + old = _hostState; + if (old == newState) return; + _hostState = newState; + _hostStateChangedUtc = DateTime.UtcNow; + } + OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); + } + // ---- codec ---- internal static ushort RegisterCount(ModbusDataType t) => t switch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index 5a1b302..9b9731e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -1,3 +1,5 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// @@ -14,6 +16,24 @@ public sealed class ModbusDriverOptions /// Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these. public IReadOnlyList Tags { get; init; } = []; + + /// + /// Background connectivity-probe settings. When + /// is true the driver runs a tick loop that issues a cheap FC03 at register 0 every + /// and raises OnHostStatusChanged on + /// Running ↔ Stopped transitions. The Admin UI / OPC UA clients see the state through + /// . + /// + public ModbusProbeOptions Probe { get; init; } = new(); +} + +public sealed class ModbusProbeOptions +{ + public bool Enabled { get; init; } = true; + public TimeSpan Interval { get; init; } = TimeSpan.FromSeconds(5); + public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2); + /// Register to read for the probe. Zero is usually safe; override for PLCs that lock register 0. + public ushort ProbeAddress { get; init; } = 0; } /// diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs new file mode 100644 index 0000000..12736a7 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs @@ -0,0 +1,208 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Modbus; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +[Trait("Category", "Unit")] +public sealed class ModbusProbeTests +{ + /// + /// Transport fake the probe tests flip between "responding" and "unreachable" to + /// exercise the state machine. Calls to SendAsync with FC=0x03 count as probe traffic + /// (the driver's probe loop issues exactly that shape). + /// + private sealed class FlappyTransport : IModbusTransport + { + public volatile bool Reachable = true; + public int ProbeCount; + + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + if (pdu[0] == 0x03) Interlocked.Increment(ref ProbeCount); + if (!Reachable) + return Task.FromException(new IOException("transport unreachable")); + + // Happy path — return a valid FC03 response for 1 register at addr. + if (pdu[0] == 0x03) + { + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var resp = new byte[2 + qty * 2]; + resp[0] = 0x03; + resp[1] = (byte)(qty * 2); + return Task.FromResult(resp); + } + return Task.FromException(new NotSupportedException()); + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + private static (ModbusDriver drv, FlappyTransport fake) NewDriver(ModbusProbeOptions probe) + { + var fake = new FlappyTransport(); + var opts = new ModbusDriverOptions { Host = "fake", Port = 502, Probe = probe }; + return (new ModbusDriver(opts, "modbus-1", _ => fake), fake); + } + + [Fact] + public async Task Initial_state_is_Unknown_before_first_probe_tick() + { + var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false }); + await drv.InitializeAsync("{}", CancellationToken.None); + + var statuses = drv.GetHostStatuses(); + statuses.Count.ShouldBe(1); + statuses[0].State.ShouldBe(HostState.Unknown); + statuses[0].HostName.ShouldBe("fake:502"); + } + + [Fact] + public async Task First_successful_probe_transitions_to_Running() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + + // Wait for the first probe to complete. + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); + while (fake.ProbeCount == 0 && DateTime.UtcNow < deadline) await Task.Delay(25); + + // Then wait for the event to actually arrive. + deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1); + while (transitions.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(25); + + transitions.Count.ShouldBeGreaterThanOrEqualTo(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.OldState.ShouldBe(HostState.Unknown); + t.NewState.ShouldBe(HostState.Running); + drv.GetHostStatuses()[0].State.ShouldBe(HostState.Running); + + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Transport_failure_transitions_to_Stopped() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + fake.Reachable = false; + await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2)); + + transitions.Select(t => t.NewState).ShouldContain(HostState.Stopped); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Recovery_transitions_Stopped_back_to_Running() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + fake.Reachable = false; + await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2)); + + fake.Reachable = true; + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + // We expect at minimum: Unknown→Running, Running→Stopped, Stopped→Running. + transitions.Count.ShouldBeGreaterThanOrEqualTo(3); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Repeated_successful_probes_do_not_generate_duplicate_Running_events() + { + var (drv, _) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(100), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + await Task.Delay(500); // several more probe ticks, all successful — state shouldn't thrash + + transitions.Count.ShouldBe(1); // only the initial Unknown→Running + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Disabled_probe_stays_Unknown_and_fires_no_events() + { + var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await Task.Delay(300); + + transitions.Count.ShouldBe(0); + drv.GetHostStatuses()[0].State.ShouldBe(HostState.Unknown); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Shutdown_stops_the_probe_loop() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(100), + Timeout = TimeSpan.FromSeconds(1), + }); + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + var before = fake.ProbeCount; + await drv.ShutdownAsync(CancellationToken.None); + await Task.Delay(400); + + // A handful of in-flight ticks may complete after shutdown in a narrow race; the + // contract is that the loop stops scheduling new ones. Tolerate ≤1 extra. + (fake.ProbeCount - before).ShouldBeLessThanOrEqualTo(1); + } + + private static async Task WaitForStateAsync(ModbusDriver drv, HostState expected, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (drv.GetHostStatuses()[0].State == expected) return; + await Task.Delay(25); + } + } +} -- 2.49.1