Merge pull request 'Phase 3 PR 23 — Modbus IHostConnectivityProbe' (#22) from phase-3-pr23-modbus-probe into v2
This commit was merged in pull request #22.
This commit is contained in:
@@ -17,7 +17,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
|||||||
/// </remarks>
|
/// </remarks>
|
||||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
||||||
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
||||||
@@ -26,6 +26,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
private long _nextSubscriptionId;
|
private long _nextSubscriptionId;
|
||||||
|
|
||||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
|
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||||
|
|
||||||
|
// Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts"
|
||||||
|
// collection has at most one entry. HostName is the Host:Port string so the Admin UI can
|
||||||
|
// display the PLC endpoint uniformly with Galaxy platforms/engines.
|
||||||
|
private readonly object _probeLock = new();
|
||||||
|
private HostState _hostState = HostState.Unknown;
|
||||||
|
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
private CancellationTokenSource? _probeCts;
|
||||||
private readonly ModbusDriverOptions _options = options;
|
private readonly ModbusDriverOptions _options = options;
|
||||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
||||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
||||||
@@ -46,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
||||||
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
|
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
|
||||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||||
|
|
||||||
|
// PR 23: kick off the probe loop once the transport is up. Initial state stays
|
||||||
|
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
|
||||||
|
// Running transition before any register round-trip has happened.
|
||||||
|
if (_options.Probe.Enabled)
|
||||||
|
{
|
||||||
|
_probeCts = new CancellationTokenSource();
|
||||||
|
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -62,6 +80,10 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
|
|
||||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
try { _probeCts?.Cancel(); } catch { }
|
||||||
|
_probeCts?.Dispose();
|
||||||
|
_probeCts = null;
|
||||||
|
|
||||||
foreach (var state in _subscriptions.Values)
|
foreach (var state in _subscriptions.Values)
|
||||||
{
|
{
|
||||||
try { state.Cts.Cancel(); } catch { }
|
try { state.Cts.Cancel(); } catch { }
|
||||||
@@ -313,6 +335,66 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
public string DiagnosticId => $"modbus-sub-{Id}";
|
public string DiagnosticId => $"modbus-sub-{Id}";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- IHostConnectivityProbe ----
|
||||||
|
|
||||||
|
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||||
|
{
|
||||||
|
lock (_probeLock)
|
||||||
|
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Host identifier surfaced to <c>IHostConnectivityProbe.GetHostStatuses</c> and the Admin UI.
|
||||||
|
/// Formatted as <c>host:port</c> so multiple Modbus drivers in the same server disambiguate
|
||||||
|
/// by endpoint without needing the driver-instance-id in the Admin dashboard.
|
||||||
|
/// </summary>
|
||||||
|
public string HostName => $"{_options.Host}:{_options.Port}";
|
||||||
|
|
||||||
|
private async Task ProbeLoopAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
var transport = _transport; // captured reference; disposal tears the loop down via ct
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var success = false;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
|
probeCts.CancelAfter(_options.Probe.Timeout);
|
||||||
|
var pdu = new byte[] { 0x03,
|
||||||
|
(byte)(_options.Probe.ProbeAddress >> 8),
|
||||||
|
(byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 };
|
||||||
|
_ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false);
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// transport / timeout / exception PDU — treated as Stopped below
|
||||||
|
}
|
||||||
|
|
||||||
|
TransitionTo(success ? HostState.Running : HostState.Stopped);
|
||||||
|
|
||||||
|
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TransitionTo(HostState newState)
|
||||||
|
{
|
||||||
|
HostState old;
|
||||||
|
lock (_probeLock)
|
||||||
|
{
|
||||||
|
old = _hostState;
|
||||||
|
if (old == newState) return;
|
||||||
|
_hostState = newState;
|
||||||
|
_hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
||||||
|
}
|
||||||
|
|
||||||
// ---- codec ----
|
// ---- codec ----
|
||||||
|
|
||||||
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -14,6 +16,24 @@ public sealed class ModbusDriverOptions
|
|||||||
|
|
||||||
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
|
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
|
||||||
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
|
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Background connectivity-probe settings. When <see cref="ModbusProbeOptions.Enabled"/>
|
||||||
|
/// is true the driver runs a tick loop that issues a cheap FC03 at register 0 every
|
||||||
|
/// <see cref="ModbusProbeOptions.Interval"/> and raises <c>OnHostStatusChanged</c> on
|
||||||
|
/// Running ↔ Stopped transitions. The Admin UI / OPC UA clients see the state through
|
||||||
|
/// <see cref="IHostConnectivityProbe"/>.
|
||||||
|
/// </summary>
|
||||||
|
public ModbusProbeOptions Probe { get; init; } = new();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class ModbusProbeOptions
|
||||||
|
{
|
||||||
|
public bool Enabled { get; init; } = true;
|
||||||
|
public TimeSpan Interval { get; init; } = TimeSpan.FromSeconds(5);
|
||||||
|
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2);
|
||||||
|
/// <summary>Register to read for the probe. Zero is usually safe; override for PLCs that lock register 0.</summary>
|
||||||
|
public ushort ProbeAddress { get; init; } = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
208
tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs
Normal file
208
tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ModbusProbeTests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Transport fake the probe tests flip between "responding" and "unreachable" to
|
||||||
|
/// exercise the state machine. Calls to SendAsync with FC=0x03 count as probe traffic
|
||||||
|
/// (the driver's probe loop issues exactly that shape).
|
||||||
|
/// </summary>
|
||||||
|
private sealed class FlappyTransport : IModbusTransport
|
||||||
|
{
|
||||||
|
public volatile bool Reachable = true;
|
||||||
|
public int ProbeCount;
|
||||||
|
|
||||||
|
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (pdu[0] == 0x03) Interlocked.Increment(ref ProbeCount);
|
||||||
|
if (!Reachable)
|
||||||
|
return Task.FromException<byte[]>(new IOException("transport unreachable"));
|
||||||
|
|
||||||
|
// Happy path — return a valid FC03 response for 1 register at addr.
|
||||||
|
if (pdu[0] == 0x03)
|
||||||
|
{
|
||||||
|
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
|
||||||
|
var resp = new byte[2 + qty * 2];
|
||||||
|
resp[0] = 0x03;
|
||||||
|
resp[1] = (byte)(qty * 2);
|
||||||
|
return Task.FromResult(resp);
|
||||||
|
}
|
||||||
|
return Task.FromException<byte[]>(new NotSupportedException());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (ModbusDriver drv, FlappyTransport fake) NewDriver(ModbusProbeOptions probe)
|
||||||
|
{
|
||||||
|
var fake = new FlappyTransport();
|
||||||
|
var opts = new ModbusDriverOptions { Host = "fake", Port = 502, Probe = probe };
|
||||||
|
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Initial_state_is_Unknown_before_first_probe_tick()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
var statuses = drv.GetHostStatuses();
|
||||||
|
statuses.Count.ShouldBe(1);
|
||||||
|
statuses[0].State.ShouldBe(HostState.Unknown);
|
||||||
|
statuses[0].HostName.ShouldBe("fake:502");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task First_successful_probe_transitions_to_Running()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
// Wait for the first probe to complete.
|
||||||
|
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2);
|
||||||
|
while (fake.ProbeCount == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
|
||||||
|
|
||||||
|
// Then wait for the event to actually arrive.
|
||||||
|
deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1);
|
||||||
|
while (transitions.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
|
||||||
|
|
||||||
|
transitions.Count.ShouldBeGreaterThanOrEqualTo(1);
|
||||||
|
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||||
|
t!.OldState.ShouldBe(HostState.Unknown);
|
||||||
|
t.NewState.ShouldBe(HostState.Running);
|
||||||
|
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Running);
|
||||||
|
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Transport_failure_transitions_to_Stopped()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = false;
|
||||||
|
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
transitions.Select(t => t.NewState).ShouldContain(HostState.Stopped);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Recovery_transitions_Stopped_back_to_Running()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = false;
|
||||||
|
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = true;
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
// We expect at minimum: Unknown→Running, Running→Stopped, Stopped→Running.
|
||||||
|
transitions.Count.ShouldBeGreaterThanOrEqualTo(3);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Repeated_successful_probes_do_not_generate_duplicate_Running_events()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(100),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
await Task.Delay(500); // several more probe ticks, all successful — state shouldn't thrash
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(1); // only the initial Unknown→Running
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Disabled_probe_stays_Unknown_and_fires_no_events()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await Task.Delay(300);
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(0);
|
||||||
|
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Unknown);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Shutdown_stops_the_probe_loop()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(100),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
var before = fake.ProbeCount;
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
await Task.Delay(400);
|
||||||
|
|
||||||
|
// A handful of in-flight ticks may complete after shutdown in a narrow race; the
|
||||||
|
// contract is that the loop stops scheduling new ones. Tolerate ≤1 extra.
|
||||||
|
(fake.ProbeCount - before).ShouldBeLessThanOrEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForStateAsync(ModbusDriver drv, HostState expected, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
if (drv.GetHostStatuses()[0].State == expected) return;
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user