Phase 3 PR 23 — Modbus IHostConnectivityProbe. ModbusDriver now implements 6 of 8 capability interfaces (adds IHostConnectivityProbe alongside IDriver + ITagDiscovery + IReadable + IWritable + ISubscribable from the earlier PRs). Background probe loop kicks off in InitializeAsync when ModbusProbeOptions.Enabled is true, sends a cheap FC03 read-1-register at ProbeAddress (default 0) every Interval (default 5s) with a per-tick Timeout (default 2s), and tracks the single Modbus endpoint's state in the HostState machine. Initial state = Unknown; first successful probe transitions to Running; any transport/timeout failure transitions to Stopped; recovery transitions back to Running. OnHostStatusChanged fires exactly on transitions (not on repeat successes — prevents event-spam on a healthy connection). HostName format is 'host:port' so the Admin UI can display the endpoint uniformly with Galaxy platforms/engines in the fleet status dashboard. GetHostStatuses returns a single-item list with the current state + last-change timestamp (Modbus driver talks to exactly one endpoint per instance — operators spin up multiple driver instances for multi-PLC deployments). ShutdownAsync cancels the probe CTS before tearing down the transport so the loop can't log a spurious Stopped after intentional shutdown (OperationCanceledException caught separately from the 'real' transport errors). ModbusDriverOptions extended with ModbusProbeOptions sub-record (Enabled default true, Interval 5s, Timeout 2s, ProbeAddress ushort for PLCs that have register-0 policies; most PLCs tolerate an FC03 at 0 but some industrial gateways lock it). Tests (7 new ModbusProbeTests): Initial_state_is_Unknown_before_first_probe_tick (probe disabled, state stays Unknown, HostName formatted); First_successful_probe_transitions_to_Running (enabled, waits for probe count + event queue, asserts Unknown → Running with correct OldState/NewState); Transport_failure_transitions_to_Stopped (flip fake.Reachable = false mid-run, wait for state diff); Recovery_transitions_Stopped_back_to_Running (up → down → up, asserts ≥ 3 transitions); Repeated_successful_probes_do_not_generate_duplicate_Running_events (several hundred ms of stable probes, count stays at 1); Disabled_probe_stays_Unknown_and_fires_no_events (safety guard when operator wants to disable probing); Shutdown_stops_the_probe_loop (probe count captured at shutdown, delay 400ms, assert ≤ 1 extra to tolerate the narrow race where an in-flight tick completes after shutdown — the contract is 'no new ticks scheduled' not 'instantaneous freeze'). FlappyTransport fake exposes a volatile Reachable flag so tests can toggle the PLC availability mid-run, + ProbeCount counter so tests can assert the loop actually issued requests. WaitForStateAsync helper polls GetHostStatuses up to a deadline; tolerates scheduler jitter on slow CI runners. Full solution: 0 errors, 202 unit + integration tests pass (22 Modbus + 180 pre-existing).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,7 +17,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||
/// </remarks>
|
||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
||||
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
||||
@@ -26,6 +26,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||
|
||||
// Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts"
|
||||
// collection has at most one entry. HostName is the Host:Port string so the Admin UI can
|
||||
// display the PLC endpoint uniformly with Galaxy platforms/engines.
|
||||
private readonly object _probeLock = new();
|
||||
private HostState _hostState = HostState.Unknown;
|
||||
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||
private CancellationTokenSource? _probeCts;
|
||||
private readonly ModbusDriverOptions _options = options;
|
||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
||||
@@ -46,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
||||
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
|
||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||
|
||||
// PR 23: kick off the probe loop once the transport is up. Initial state stays
|
||||
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
|
||||
// Running transition before any register round-trip has happened.
|
||||
if (_options.Probe.Enabled)
|
||||
{
|
||||
_probeCts = new CancellationTokenSource();
|
||||
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -62,6 +80,10 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
|
||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
try { _probeCts?.Cancel(); } catch { }
|
||||
_probeCts?.Dispose();
|
||||
_probeCts = null;
|
||||
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
@@ -313,6 +335,66 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
public string DiagnosticId => $"modbus-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||
{
|
||||
lock (_probeLock)
|
||||
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Host identifier surfaced to <c>IHostConnectivityProbe.GetHostStatuses</c> and the Admin UI.
|
||||
/// Formatted as <c>host:port</c> so multiple Modbus drivers in the same server disambiguate
|
||||
/// by endpoint without needing the driver-instance-id in the Admin dashboard.
|
||||
/// </summary>
|
||||
public string HostName => $"{_options.Host}:{_options.Port}";
|
||||
|
||||
private async Task ProbeLoopAsync(CancellationToken ct)
|
||||
{
|
||||
var transport = _transport; // captured reference; disposal tears the loop down via ct
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var success = false;
|
||||
try
|
||||
{
|
||||
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
probeCts.CancelAfter(_options.Probe.Timeout);
|
||||
var pdu = new byte[] { 0x03,
|
||||
(byte)(_options.Probe.ProbeAddress >> 8),
|
||||
(byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 };
|
||||
_ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false);
|
||||
success = true;
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// transport / timeout / exception PDU — treated as Stopped below
|
||||
}
|
||||
|
||||
TransitionTo(success ? HostState.Running : HostState.Stopped);
|
||||
|
||||
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
}
|
||||
}
|
||||
|
||||
private void TransitionTo(HostState newState)
|
||||
{
|
||||
HostState old;
|
||||
lock (_probeLock)
|
||||
{
|
||||
old = _hostState;
|
||||
if (old == newState) return;
|
||||
_hostState = newState;
|
||||
_hostStateChangedUtc = DateTime.UtcNow;
|
||||
}
|
||||
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
||||
}
|
||||
|
||||
// ---- codec ----
|
||||
|
||||
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
||||
|
||||
Reference in New Issue
Block a user