diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 857ffff..1ec266f 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -17,7 +17,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, Func? transportFactory = null) - : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { // Active polling subscriptions. Each subscription owns a background Task that polls the // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange @@ -26,6 +26,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta private long _nextSubscriptionId; public event EventHandler? OnDataChange; + public event EventHandler? OnHostStatusChanged; + + // Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts" + // collection has at most one entry. HostName is the Host:Port string so the Admin UI can + // display the PLC endpoint uniformly with Galaxy platforms/engines. + private readonly object _probeLock = new(); + private HostState _hostState = HostState.Unknown; + private DateTime _hostStateChangedUtc = DateTime.UtcNow; + private CancellationTokenSource? _probeCts; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); @@ -46,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false); foreach (var t in _options.Tags) _tagsByName[t.Name] = t; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + + // PR 23: kick off the probe loop once the transport is up. Initial state stays + // Unknown until the first probe tick succeeds — avoids broadcasting a premature + // Running transition before any register round-trip has happened. + if (_options.Probe.Enabled) + { + _probeCts = new CancellationTokenSource(); + _ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token); + } } catch (Exception ex) { @@ -62,6 +80,10 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public async Task ShutdownAsync(CancellationToken cancellationToken) { + try { _probeCts?.Cancel(); } catch { } + _probeCts?.Dispose(); + _probeCts = null; + foreach (var state in _subscriptions.Values) { try { state.Cts.Cancel(); } catch { } @@ -313,6 +335,66 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public string DiagnosticId => $"modbus-sub-{Id}"; } + // ---- IHostConnectivityProbe ---- + + public IReadOnlyList GetHostStatuses() + { + lock (_probeLock) + return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; + } + + /// + /// Host identifier surfaced to IHostConnectivityProbe.GetHostStatuses and the Admin UI. + /// Formatted as host:port so multiple Modbus drivers in the same server disambiguate + /// by endpoint without needing the driver-instance-id in the Admin dashboard. + /// + public string HostName => $"{_options.Host}:{_options.Port}"; + + private async Task ProbeLoopAsync(CancellationToken ct) + { + var transport = _transport; // captured reference; disposal tears the loop down via ct + while (!ct.IsCancellationRequested) + { + var success = false; + try + { + using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + probeCts.CancelAfter(_options.Probe.Timeout); + var pdu = new byte[] { 0x03, + (byte)(_options.Probe.ProbeAddress >> 8), + (byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 }; + _ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false); + success = true; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + catch + { + // transport / timeout / exception PDU — treated as Stopped below + } + + TransitionTo(success ? HostState.Running : HostState.Stopped); + + try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + } + } + + private void TransitionTo(HostState newState) + { + HostState old; + lock (_probeLock) + { + old = _hostState; + if (old == newState) return; + _hostState = newState; + _hostStateChangedUtc = DateTime.UtcNow; + } + OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); + } + // ---- codec ---- internal static ushort RegisterCount(ModbusDataType t) => t switch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index 5a1b302..9b9731e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -1,3 +1,5 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// @@ -14,6 +16,24 @@ public sealed class ModbusDriverOptions /// Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these. public IReadOnlyList Tags { get; init; } = []; + + /// + /// Background connectivity-probe settings. When + /// is true the driver runs a tick loop that issues a cheap FC03 at register 0 every + /// and raises OnHostStatusChanged on + /// Running ↔ Stopped transitions. The Admin UI / OPC UA clients see the state through + /// . + /// + public ModbusProbeOptions Probe { get; init; } = new(); +} + +public sealed class ModbusProbeOptions +{ + public bool Enabled { get; init; } = true; + public TimeSpan Interval { get; init; } = TimeSpan.FromSeconds(5); + public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2); + /// Register to read for the probe. Zero is usually safe; override for PLCs that lock register 0. + public ushort ProbeAddress { get; init; } = 0; } /// diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs new file mode 100644 index 0000000..12736a7 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs @@ -0,0 +1,208 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Modbus; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +[Trait("Category", "Unit")] +public sealed class ModbusProbeTests +{ + /// + /// Transport fake the probe tests flip between "responding" and "unreachable" to + /// exercise the state machine. Calls to SendAsync with FC=0x03 count as probe traffic + /// (the driver's probe loop issues exactly that shape). + /// + private sealed class FlappyTransport : IModbusTransport + { + public volatile bool Reachable = true; + public int ProbeCount; + + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + if (pdu[0] == 0x03) Interlocked.Increment(ref ProbeCount); + if (!Reachable) + return Task.FromException(new IOException("transport unreachable")); + + // Happy path — return a valid FC03 response for 1 register at addr. + if (pdu[0] == 0x03) + { + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var resp = new byte[2 + qty * 2]; + resp[0] = 0x03; + resp[1] = (byte)(qty * 2); + return Task.FromResult(resp); + } + return Task.FromException(new NotSupportedException()); + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + private static (ModbusDriver drv, FlappyTransport fake) NewDriver(ModbusProbeOptions probe) + { + var fake = new FlappyTransport(); + var opts = new ModbusDriverOptions { Host = "fake", Port = 502, Probe = probe }; + return (new ModbusDriver(opts, "modbus-1", _ => fake), fake); + } + + [Fact] + public async Task Initial_state_is_Unknown_before_first_probe_tick() + { + var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false }); + await drv.InitializeAsync("{}", CancellationToken.None); + + var statuses = drv.GetHostStatuses(); + statuses.Count.ShouldBe(1); + statuses[0].State.ShouldBe(HostState.Unknown); + statuses[0].HostName.ShouldBe("fake:502"); + } + + [Fact] + public async Task First_successful_probe_transitions_to_Running() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + + // Wait for the first probe to complete. + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); + while (fake.ProbeCount == 0 && DateTime.UtcNow < deadline) await Task.Delay(25); + + // Then wait for the event to actually arrive. + deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1); + while (transitions.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(25); + + transitions.Count.ShouldBeGreaterThanOrEqualTo(1); + transitions.TryDequeue(out var t).ShouldBeTrue(); + t!.OldState.ShouldBe(HostState.Unknown); + t.NewState.ShouldBe(HostState.Running); + drv.GetHostStatuses()[0].State.ShouldBe(HostState.Running); + + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Transport_failure_transitions_to_Stopped() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + fake.Reachable = false; + await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2)); + + transitions.Select(t => t.NewState).ShouldContain(HostState.Stopped); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Recovery_transitions_Stopped_back_to_Running() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(150), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + fake.Reachable = false; + await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2)); + + fake.Reachable = true; + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + // We expect at minimum: Unknown→Running, Running→Stopped, Stopped→Running. + transitions.Count.ShouldBeGreaterThanOrEqualTo(3); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Repeated_successful_probes_do_not_generate_duplicate_Running_events() + { + var (drv, _) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(100), + Timeout = TimeSpan.FromSeconds(1), + }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + await Task.Delay(500); // several more probe ticks, all successful — state shouldn't thrash + + transitions.Count.ShouldBe(1); // only the initial Unknown→Running + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Disabled_probe_stays_Unknown_and_fires_no_events() + { + var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false }); + var transitions = new ConcurrentQueue(); + drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e); + + await drv.InitializeAsync("{}", CancellationToken.None); + await Task.Delay(300); + + transitions.Count.ShouldBe(0); + drv.GetHostStatuses()[0].State.ShouldBe(HostState.Unknown); + await drv.ShutdownAsync(CancellationToken.None); + } + + [Fact] + public async Task Shutdown_stops_the_probe_loop() + { + var (drv, fake) = NewDriver(new ModbusProbeOptions + { + Enabled = true, + Interval = TimeSpan.FromMilliseconds(100), + Timeout = TimeSpan.FromSeconds(1), + }); + await drv.InitializeAsync("{}", CancellationToken.None); + await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2)); + + var before = fake.ProbeCount; + await drv.ShutdownAsync(CancellationToken.None); + await Task.Delay(400); + + // A handful of in-flight ticks may complete after shutdown in a narrow race; the + // contract is that the loop stops scheduling new ones. Tolerate ≤1 extra. + (fake.ProbeCount - before).ShouldBeLessThanOrEqualTo(1); + } + + private static async Task WaitForStateAsync(ModbusDriver drv, HostState expected, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (drv.GetHostStatuses()[0].State == expected) return; + await Task.Delay(25); + } + } +}