Phase 3 PR 23 — Modbus IHostConnectivityProbe #22

Merged
dohertj2 merged 1 commits from phase-3-pr23-modbus-probe into v2 2026-04-18 12:23:05 -04:00
3 changed files with 311 additions and 1 deletions

View File

@@ -17,7 +17,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// </remarks>
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
{
// Active polling subscriptions. Each subscription owns a background Task that polls the
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
@@ -26,6 +26,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
private long _nextSubscriptionId;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
// Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts"
// collection has at most one entry. HostName is the Host:Port string so the Admin UI can
// display the PLC endpoint uniformly with Galaxy platforms/engines.
private readonly object _probeLock = new();
private HostState _hostState = HostState.Unknown;
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
private CancellationTokenSource? _probeCts;
private readonly ModbusDriverOptions _options = options;
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
@@ -46,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
// PR 23: kick off the probe loop once the transport is up. Initial state stays
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
// Running transition before any register round-trip has happened.
if (_options.Probe.Enabled)
{
_probeCts = new CancellationTokenSource();
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
}
}
catch (Exception ex)
{
@@ -62,6 +80,10 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
try { _probeCts?.Cancel(); } catch { }
_probeCts?.Dispose();
_probeCts = null;
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
@@ -313,6 +335,66 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
public string DiagnosticId => $"modbus-sub-{Id}";
}
// ---- IHostConnectivityProbe ----
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
{
lock (_probeLock)
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
}
/// <summary>
/// Host identifier surfaced to <c>IHostConnectivityProbe.GetHostStatuses</c> and the Admin UI.
/// Formatted as <c>host:port</c> so multiple Modbus drivers in the same server disambiguate
/// by endpoint without needing the driver-instance-id in the Admin dashboard.
/// </summary>
public string HostName => $"{_options.Host}:{_options.Port}";
private async Task ProbeLoopAsync(CancellationToken ct)
{
var transport = _transport; // captured reference; disposal tears the loop down via ct
while (!ct.IsCancellationRequested)
{
var success = false;
try
{
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
probeCts.CancelAfter(_options.Probe.Timeout);
var pdu = new byte[] { 0x03,
(byte)(_options.Probe.ProbeAddress >> 8),
(byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 };
_ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false);
success = true;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
return;
}
catch
{
// transport / timeout / exception PDU — treated as Stopped below
}
TransitionTo(success ? HostState.Running : HostState.Stopped);
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
}
}
private void TransitionTo(HostState newState)
{
HostState old;
lock (_probeLock)
{
old = _hostState;
if (old == newState) return;
_hostState = newState;
_hostStateChangedUtc = DateTime.UtcNow;
}
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
// ---- codec ----
internal static ushort RegisterCount(ModbusDataType t) => t switch

View File

@@ -1,3 +1,5 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// <summary>
@@ -14,6 +16,24 @@ public sealed class ModbusDriverOptions
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
/// <summary>
/// Background connectivity-probe settings. When <see cref="ModbusProbeOptions.Enabled"/>
/// is true the driver runs a tick loop that issues a cheap FC03 at register 0 every
/// <see cref="ModbusProbeOptions.Interval"/> and raises <c>OnHostStatusChanged</c> on
/// Running ↔ Stopped transitions. The Admin UI / OPC UA clients see the state through
/// <see cref="IHostConnectivityProbe"/>.
/// </summary>
public ModbusProbeOptions Probe { get; init; } = new();
}
public sealed class ModbusProbeOptions
{
public bool Enabled { get; init; } = true;
public TimeSpan Interval { get; init; } = TimeSpan.FromSeconds(5);
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2);
/// <summary>Register to read for the probe. Zero is usually safe; override for PLCs that lock register 0.</summary>
public ushort ProbeAddress { get; init; } = 0;
}
/// <summary>

View File

@@ -0,0 +1,208 @@
using System.Collections.Concurrent;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
[Trait("Category", "Unit")]
public sealed class ModbusProbeTests
{
/// <summary>
/// Transport fake the probe tests flip between "responding" and "unreachable" to
/// exercise the state machine. Calls to SendAsync with FC=0x03 count as probe traffic
/// (the driver's probe loop issues exactly that shape).
/// </summary>
private sealed class FlappyTransport : IModbusTransport
{
public volatile bool Reachable = true;
public int ProbeCount;
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
{
if (pdu[0] == 0x03) Interlocked.Increment(ref ProbeCount);
if (!Reachable)
return Task.FromException<byte[]>(new IOException("transport unreachable"));
// Happy path — return a valid FC03 response for 1 register at addr.
if (pdu[0] == 0x03)
{
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
var resp = new byte[2 + qty * 2];
resp[0] = 0x03;
resp[1] = (byte)(qty * 2);
return Task.FromResult(resp);
}
return Task.FromException<byte[]>(new NotSupportedException());
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
private static (ModbusDriver drv, FlappyTransport fake) NewDriver(ModbusProbeOptions probe)
{
var fake = new FlappyTransport();
var opts = new ModbusDriverOptions { Host = "fake", Port = 502, Probe = probe };
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
}
[Fact]
public async Task Initial_state_is_Unknown_before_first_probe_tick()
{
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
await drv.InitializeAsync("{}", CancellationToken.None);
var statuses = drv.GetHostStatuses();
statuses.Count.ShouldBe(1);
statuses[0].State.ShouldBe(HostState.Unknown);
statuses[0].HostName.ShouldBe("fake:502");
}
[Fact]
public async Task First_successful_probe_transitions_to_Running()
{
var (drv, fake) = NewDriver(new ModbusProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(150),
Timeout = TimeSpan.FromSeconds(1),
});
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
// Wait for the first probe to complete.
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2);
while (fake.ProbeCount == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
// Then wait for the event to actually arrive.
deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1);
while (transitions.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
transitions.Count.ShouldBeGreaterThanOrEqualTo(1);
transitions.TryDequeue(out var t).ShouldBeTrue();
t!.OldState.ShouldBe(HostState.Unknown);
t.NewState.ShouldBe(HostState.Running);
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Running);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Transport_failure_transitions_to_Stopped()
{
var (drv, fake) = NewDriver(new ModbusProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(150),
Timeout = TimeSpan.FromSeconds(1),
});
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
fake.Reachable = false;
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
transitions.Select(t => t.NewState).ShouldContain(HostState.Stopped);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Recovery_transitions_Stopped_back_to_Running()
{
var (drv, fake) = NewDriver(new ModbusProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(150),
Timeout = TimeSpan.FromSeconds(1),
});
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
fake.Reachable = false;
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
fake.Reachable = true;
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
// We expect at minimum: Unknown→Running, Running→Stopped, Stopped→Running.
transitions.Count.ShouldBeGreaterThanOrEqualTo(3);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Repeated_successful_probes_do_not_generate_duplicate_Running_events()
{
var (drv, _) = NewDriver(new ModbusProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromSeconds(1),
});
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
await Task.Delay(500); // several more probe ticks, all successful — state shouldn't thrash
transitions.Count.ShouldBe(1); // only the initial Unknown→Running
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Disabled_probe_stays_Unknown_and_fires_no_events()
{
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
await drv.InitializeAsync("{}", CancellationToken.None);
await Task.Delay(300);
transitions.Count.ShouldBe(0);
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Unknown);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Shutdown_stops_the_probe_loop()
{
var (drv, fake) = NewDriver(new ModbusProbeOptions
{
Enabled = true,
Interval = TimeSpan.FromMilliseconds(100),
Timeout = TimeSpan.FromSeconds(1),
});
await drv.InitializeAsync("{}", CancellationToken.None);
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
var before = fake.ProbeCount;
await drv.ShutdownAsync(CancellationToken.None);
await Task.Delay(400);
// A handful of in-flight ticks may complete after shutdown in a narrow race; the
// contract is that the loop stops scheduling new ones. Tolerate ≤1 extra.
(fake.ProbeCount - before).ShouldBeLessThanOrEqualTo(1);
}
private static async Task WaitForStateAsync(ModbusDriver drv, HostState expected, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (drv.GetHostStatuses()[0].State == expected) return;
await Task.Delay(25);
}
}
}