Compare commits
4 Commits
phase-3-pr
...
phase-3-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
268b12edec | ||
| edce1be742 | |||
|
|
18b3e24710 | ||
| f6a12dafe9 |
@@ -17,8 +17,24 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
|||||||
/// </remarks>
|
/// </remarks>
|
||||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||||
: IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
|
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
||||||
|
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
||||||
|
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
|
||||||
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||||
|
private long _nextSubscriptionId;
|
||||||
|
|
||||||
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
|
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||||
|
|
||||||
|
// Single-host probe state — Modbus driver talks to exactly one endpoint so the "hosts"
|
||||||
|
// collection has at most one entry. HostName is the Host:Port string so the Admin UI can
|
||||||
|
// display the PLC endpoint uniformly with Galaxy platforms/engines.
|
||||||
|
private readonly object _probeLock = new();
|
||||||
|
private HostState _hostState = HostState.Unknown;
|
||||||
|
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
private CancellationTokenSource? _probeCts;
|
||||||
private readonly ModbusDriverOptions _options = options;
|
private readonly ModbusDriverOptions _options = options;
|
||||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
||||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
||||||
@@ -39,6 +55,15 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
||||||
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
|
foreach (var t in _options.Tags) _tagsByName[t.Name] = t;
|
||||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||||
|
|
||||||
|
// PR 23: kick off the probe loop once the transport is up. Initial state stays
|
||||||
|
// Unknown until the first probe tick succeeds — avoids broadcasting a premature
|
||||||
|
// Running transition before any register round-trip has happened.
|
||||||
|
if (_options.Probe.Enabled)
|
||||||
|
{
|
||||||
|
_probeCts = new CancellationTokenSource();
|
||||||
|
_ = Task.Run(() => ProbeLoopAsync(_probeCts.Token), _probeCts.Token);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -55,6 +80,17 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
|
|
||||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
try { _probeCts?.Cancel(); } catch { }
|
||||||
|
_probeCts?.Dispose();
|
||||||
|
_probeCts = null;
|
||||||
|
|
||||||
|
foreach (var state in _subscriptions.Values)
|
||||||
|
{
|
||||||
|
try { state.Cts.Cancel(); } catch { }
|
||||||
|
state.Cts.Dispose();
|
||||||
|
}
|
||||||
|
_subscriptions.Clear();
|
||||||
|
|
||||||
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
||||||
_transport = null;
|
_transport = null;
|
||||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||||
@@ -220,6 +256,145 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- ISubscribable (polling overlay) ----
|
||||||
|
|
||||||
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
||||||
|
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
|
||||||
|
: publishingInterval;
|
||||||
|
var handle = new ModbusSubscriptionHandle(id);
|
||||||
|
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||||
|
_subscriptions[id] = state;
|
||||||
|
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||||
|
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||||
|
{
|
||||||
|
state.Cts.Cancel();
|
||||||
|
state.Cts.Dispose();
|
||||||
|
}
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||||
|
{
|
||||||
|
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
|
||||||
|
// current value per Part 4 convention, even if the value never changes thereafter.
|
||||||
|
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
catch { /* first-read error — polling continues */ }
|
||||||
|
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
|
||||||
|
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
||||||
|
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||||
|
{
|
||||||
|
var tagRef = state.TagReferences[i];
|
||||||
|
var current = snapshots[i];
|
||||||
|
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||||
|
|
||||||
|
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
|
||||||
|
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||||
|
{
|
||||||
|
state.LastValues[tagRef] = current;
|
||||||
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record SubscriptionState(
|
||||||
|
ModbusSubscriptionHandle Handle,
|
||||||
|
IReadOnlyList<string> TagReferences,
|
||||||
|
TimeSpan Interval,
|
||||||
|
CancellationTokenSource Cts)
|
||||||
|
{
|
||||||
|
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||||
|
= new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => $"modbus-sub-{Id}";
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- IHostConnectivityProbe ----
|
||||||
|
|
||||||
|
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||||
|
{
|
||||||
|
lock (_probeLock)
|
||||||
|
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Host identifier surfaced to <c>IHostConnectivityProbe.GetHostStatuses</c> and the Admin UI.
|
||||||
|
/// Formatted as <c>host:port</c> so multiple Modbus drivers in the same server disambiguate
|
||||||
|
/// by endpoint without needing the driver-instance-id in the Admin dashboard.
|
||||||
|
/// </summary>
|
||||||
|
public string HostName => $"{_options.Host}:{_options.Port}";
|
||||||
|
|
||||||
|
private async Task ProbeLoopAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
var transport = _transport; // captured reference; disposal tears the loop down via ct
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var success = false;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var probeCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
|
probeCts.CancelAfter(_options.Probe.Timeout);
|
||||||
|
var pdu = new byte[] { 0x03,
|
||||||
|
(byte)(_options.Probe.ProbeAddress >> 8),
|
||||||
|
(byte)(_options.Probe.ProbeAddress & 0xFF), 0x00, 0x01 };
|
||||||
|
_ = await transport!.SendAsync(_options.UnitId, pdu, probeCts.Token).ConfigureAwait(false);
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// transport / timeout / exception PDU — treated as Stopped below
|
||||||
|
}
|
||||||
|
|
||||||
|
TransitionTo(success ? HostState.Running : HostState.Stopped);
|
||||||
|
|
||||||
|
try { await Task.Delay(_options.Probe.Interval, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TransitionTo(HostState newState)
|
||||||
|
{
|
||||||
|
HostState old;
|
||||||
|
lock (_probeLock)
|
||||||
|
{
|
||||||
|
old = _hostState;
|
||||||
|
if (old == newState) return;
|
||||||
|
_hostState = newState;
|
||||||
|
_hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
||||||
|
}
|
||||||
|
|
||||||
// ---- codec ----
|
// ---- codec ----
|
||||||
|
|
||||||
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -14,6 +16,24 @@ public sealed class ModbusDriverOptions
|
|||||||
|
|
||||||
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
|
/// <summary>Pre-declared tag map. Modbus has no discovery protocol — the driver returns exactly these.</summary>
|
||||||
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
|
public IReadOnlyList<ModbusTagDefinition> Tags { get; init; } = [];
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Background connectivity-probe settings. When <see cref="ModbusProbeOptions.Enabled"/>
|
||||||
|
/// is true the driver runs a tick loop that issues a cheap FC03 at register 0 every
|
||||||
|
/// <see cref="ModbusProbeOptions.Interval"/> and raises <c>OnHostStatusChanged</c> on
|
||||||
|
/// Running ↔ Stopped transitions. The Admin UI / OPC UA clients see the state through
|
||||||
|
/// <see cref="IHostConnectivityProbe"/>.
|
||||||
|
/// </summary>
|
||||||
|
public ModbusProbeOptions Probe { get; init; } = new();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class ModbusProbeOptions
|
||||||
|
{
|
||||||
|
public bool Enabled { get; init; } = true;
|
||||||
|
public TimeSpan Interval { get; init; } = TimeSpan.FromSeconds(5);
|
||||||
|
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(2);
|
||||||
|
/// <summary>Register to read for the probe. Zero is usually safe; override for PLCs that lock register 0.</summary>
|
||||||
|
public ushort ProbeAddress { get; init; } = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
208
tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs
Normal file
208
tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusProbeTests.cs
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ModbusProbeTests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Transport fake the probe tests flip between "responding" and "unreachable" to
|
||||||
|
/// exercise the state machine. Calls to SendAsync with FC=0x03 count as probe traffic
|
||||||
|
/// (the driver's probe loop issues exactly that shape).
|
||||||
|
/// </summary>
|
||||||
|
private sealed class FlappyTransport : IModbusTransport
|
||||||
|
{
|
||||||
|
public volatile bool Reachable = true;
|
||||||
|
public int ProbeCount;
|
||||||
|
|
||||||
|
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (pdu[0] == 0x03) Interlocked.Increment(ref ProbeCount);
|
||||||
|
if (!Reachable)
|
||||||
|
return Task.FromException<byte[]>(new IOException("transport unreachable"));
|
||||||
|
|
||||||
|
// Happy path — return a valid FC03 response for 1 register at addr.
|
||||||
|
if (pdu[0] == 0x03)
|
||||||
|
{
|
||||||
|
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
|
||||||
|
var resp = new byte[2 + qty * 2];
|
||||||
|
resp[0] = 0x03;
|
||||||
|
resp[1] = (byte)(qty * 2);
|
||||||
|
return Task.FromResult(resp);
|
||||||
|
}
|
||||||
|
return Task.FromException<byte[]>(new NotSupportedException());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (ModbusDriver drv, FlappyTransport fake) NewDriver(ModbusProbeOptions probe)
|
||||||
|
{
|
||||||
|
var fake = new FlappyTransport();
|
||||||
|
var opts = new ModbusDriverOptions { Host = "fake", Port = 502, Probe = probe };
|
||||||
|
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Initial_state_is_Unknown_before_first_probe_tick()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
var statuses = drv.GetHostStatuses();
|
||||||
|
statuses.Count.ShouldBe(1);
|
||||||
|
statuses[0].State.ShouldBe(HostState.Unknown);
|
||||||
|
statuses[0].HostName.ShouldBe("fake:502");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task First_successful_probe_transitions_to_Running()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
// Wait for the first probe to complete.
|
||||||
|
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2);
|
||||||
|
while (fake.ProbeCount == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
|
||||||
|
|
||||||
|
// Then wait for the event to actually arrive.
|
||||||
|
deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1);
|
||||||
|
while (transitions.Count == 0 && DateTime.UtcNow < deadline) await Task.Delay(25);
|
||||||
|
|
||||||
|
transitions.Count.ShouldBeGreaterThanOrEqualTo(1);
|
||||||
|
transitions.TryDequeue(out var t).ShouldBeTrue();
|
||||||
|
t!.OldState.ShouldBe(HostState.Unknown);
|
||||||
|
t.NewState.ShouldBe(HostState.Running);
|
||||||
|
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Running);
|
||||||
|
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Transport_failure_transitions_to_Stopped()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = false;
|
||||||
|
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
transitions.Select(t => t.NewState).ShouldContain(HostState.Stopped);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Recovery_transitions_Stopped_back_to_Running()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(150),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = false;
|
||||||
|
await WaitForStateAsync(drv, HostState.Stopped, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
fake.Reachable = true;
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
// We expect at minimum: Unknown→Running, Running→Stopped, Stopped→Running.
|
||||||
|
transitions.Count.ShouldBeGreaterThanOrEqualTo(3);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Repeated_successful_probes_do_not_generate_duplicate_Running_events()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(100),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
await Task.Delay(500); // several more probe ticks, all successful — state shouldn't thrash
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(1); // only the initial Unknown→Running
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Disabled_probe_stays_Unknown_and_fires_no_events()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusProbeOptions { Enabled = false });
|
||||||
|
var transitions = new ConcurrentQueue<HostStatusChangedEventArgs>();
|
||||||
|
drv.OnHostStatusChanged += (_, e) => transitions.Enqueue(e);
|
||||||
|
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await Task.Delay(300);
|
||||||
|
|
||||||
|
transitions.Count.ShouldBe(0);
|
||||||
|
drv.GetHostStatuses()[0].State.ShouldBe(HostState.Unknown);
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Shutdown_stops_the_probe_loop()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusProbeOptions
|
||||||
|
{
|
||||||
|
Enabled = true,
|
||||||
|
Interval = TimeSpan.FromMilliseconds(100),
|
||||||
|
Timeout = TimeSpan.FromSeconds(1),
|
||||||
|
});
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
await WaitForStateAsync(drv, HostState.Running, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
var before = fake.ProbeCount;
|
||||||
|
await drv.ShutdownAsync(CancellationToken.None);
|
||||||
|
await Task.Delay(400);
|
||||||
|
|
||||||
|
// A handful of in-flight ticks may complete after shutdown in a narrow race; the
|
||||||
|
// contract is that the loop stops scheduling new ones. Tolerate ≤1 extra.
|
||||||
|
(fake.ProbeCount - before).ShouldBeLessThanOrEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForStateAsync(ModbusDriver drv, HostState expected, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
if (drv.GetHostStatuses()[0].State == expected) return;
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,180 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ModbusSubscriptionTests
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Lightweight fake transport the subscription tests drive through — only the FC03
|
||||||
|
/// (Read Holding Registers) path is used. Mutating <see cref="HoldingRegisters"/>
|
||||||
|
/// between polls is how each test simulates a PLC value change.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class FakeTransport : IModbusTransport
|
||||||
|
{
|
||||||
|
public readonly ushort[] HoldingRegisters = new ushort[256];
|
||||||
|
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (pdu[0] != 0x03) return Task.FromException<byte[]>(new NotSupportedException("FC not supported"));
|
||||||
|
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
|
||||||
|
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
|
||||||
|
var resp = new byte[2 + qty * 2];
|
||||||
|
resp[0] = 0x03;
|
||||||
|
resp[1] = (byte)(qty * 2);
|
||||||
|
for (var i = 0; i < qty; i++)
|
||||||
|
{
|
||||||
|
resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8);
|
||||||
|
resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF);
|
||||||
|
}
|
||||||
|
return Task.FromResult(resp);
|
||||||
|
}
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags)
|
||||||
|
{
|
||||||
|
var fake = new FakeTransport();
|
||||||
|
var opts = new ModbusDriverOptions { Host = "fake", Tags = tags };
|
||||||
|
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(
|
||||||
|
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
||||||
|
new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
fake.HoldingRegisters[0] = 100;
|
||||||
|
fake.HoldingRegisters[1] = 200;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||||
|
|
||||||
|
var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
|
||||||
|
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
events.Select(e => e.FullReference).ShouldContain("Level");
|
||||||
|
events.Select(e => e.FullReference).ShouldContain("Temp");
|
||||||
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unchanged_values_do_not_raise_after_initial_poll()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
fake.HoldingRegisters[0] = 100;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||||
|
|
||||||
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||||
|
await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time
|
||||||
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||||
|
|
||||||
|
events.Count.ShouldBe(1); // only the initial-data push, no change events after
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Value_change_between_polls_raises_OnDataChange()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
fake.HoldingRegisters[0] = 100;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||||
|
|
||||||
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||||
|
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
||||||
|
fake.HoldingRegisters[0] = 200; // simulate PLC update
|
||||||
|
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||||
|
events.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||||
|
events.Last().Snapshot.Value.ShouldBe((short)200);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unsubscribe_stops_the_polling_loop()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||||
|
|
||||||
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||||
|
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
||||||
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||||
|
|
||||||
|
var countAfterUnsub = events.Count;
|
||||||
|
fake.HoldingRegisters[0] = 999; // would trigger a change if still polling
|
||||||
|
await Task.Delay(400);
|
||||||
|
events.Count.ShouldBe(countAfterUnsub);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_floors_intervals_below_100ms()
|
||||||
|
{
|
||||||
|
var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
// 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a
|
||||||
|
// 10ms interval would produce many more events than a 100ms interval would on a stable
|
||||||
|
// value. Since the value is unchanged, we only expect the initial-data push (1 event).
|
||||||
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||||
|
|
||||||
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None);
|
||||||
|
await Task.Delay(300);
|
||||||
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||||
|
|
||||||
|
events.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Multiple_subscriptions_fire_independently()
|
||||||
|
{
|
||||||
|
var (drv, fake) = NewDriver(
|
||||||
|
new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
||||||
|
new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
||||||
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||||
|
|
||||||
|
var eventsA = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
var eventsB = new ConcurrentQueue<DataChangeEventArgs>();
|
||||||
|
drv.OnDataChange += (_, e) =>
|
||||||
|
{
|
||||||
|
if (e.FullReference == "A") eventsA.Enqueue(e);
|
||||||
|
else if (e.FullReference == "B") eventsB.Enqueue(e);
|
||||||
|
};
|
||||||
|
|
||||||
|
var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||||
|
var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||||
|
await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1));
|
||||||
|
await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1));
|
||||||
|
|
||||||
|
await drv.UnsubscribeAsync(ha, CancellationToken.None);
|
||||||
|
var aCount = eventsA.Count;
|
||||||
|
fake.HoldingRegisters[1] = 77; // only B should pick this up
|
||||||
|
await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe
|
||||||
|
eventsB.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||||
|
await drv.UnsubscribeAsync(hb, CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForCountAsync<T>(ConcurrentQueue<T> q, int target, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (q.Count < target && DateTime.UtcNow < deadline)
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user