Compare commits
4 Commits
phase-6-4-
...
abcip-pr1-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ab587707f | ||
| 2172d49d2e | |||
|
|
ae8f226e45 | ||
| e032045247 |
146
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
Normal file
146
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
Normal file
@@ -0,0 +1,146 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Shared poll-based subscription engine for drivers whose underlying protocol has no
|
||||
/// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription
|
||||
/// that periodically invokes the supplied reader, diffs each snapshot against the last
|
||||
/// known value, and dispatches a change callback per changed tag. Extracted from
|
||||
/// <c>ModbusDriver</c> (AB CIP PR 1) so poll-based drivers don't each re-ship the loop,
|
||||
/// floor logic, and lifecycle plumbing.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>The engine is read-path agnostic: it calls the supplied <c>reader</c> delegate
|
||||
/// and trusts the driver to map protocol errors into <see cref="DataValueSnapshot.StatusCode"/>.
|
||||
/// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA
|
||||
/// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs
|
||||
/// from the previously-seen snapshot.</para>
|
||||
///
|
||||
/// <para>Exceptions thrown by the reader on the initial poll or any subsequent poll are
|
||||
/// swallowed — the loop continues on the next tick. The driver's own health surface is
|
||||
/// where transient poll failures should be reported; the engine intentionally does not
|
||||
/// double-book that responsibility.</para>
|
||||
/// </remarks>
|
||||
public sealed class PollGroupEngine : IAsyncDisposable
|
||||
{
|
||||
private readonly Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> _reader;
|
||||
private readonly Action<ISubscriptionHandle, string, DataValueSnapshot> _onChange;
|
||||
private readonly TimeSpan _minInterval;
|
||||
private readonly ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||
private long _nextId;
|
||||
|
||||
/// <summary>Default floor for publishing intervals — matches the Modbus 100 ms cap.</summary>
|
||||
public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100);
|
||||
|
||||
/// <param name="reader">Driver-supplied batch reader; snapshots MUST be returned in the same
|
||||
/// order as the input references.</param>
|
||||
/// <param name="onChange">Callback invoked per changed tag — the driver forwards to its own
|
||||
/// <see cref="ISubscribable.OnDataChange"/> event.</param>
|
||||
/// <param name="minInterval">Interval floor; anything below is clamped. Defaults to 100 ms
|
||||
/// per <see cref="DefaultMinInterval"/>.</param>
|
||||
public PollGroupEngine(
|
||||
Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> reader,
|
||||
Action<ISubscriptionHandle, string, DataValueSnapshot> onChange,
|
||||
TimeSpan? minInterval = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(reader);
|
||||
ArgumentNullException.ThrowIfNull(onChange);
|
||||
_reader = reader;
|
||||
_onChange = onChange;
|
||||
_minInterval = minInterval ?? DefaultMinInterval;
|
||||
}
|
||||
|
||||
/// <summary>Register a new polled subscription and start its background loop.</summary>
|
||||
public ISubscriptionHandle Subscribe(IReadOnlyList<string> fullReferences, TimeSpan publishingInterval)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(fullReferences);
|
||||
var id = Interlocked.Increment(ref _nextId);
|
||||
var cts = new CancellationTokenSource();
|
||||
var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval;
|
||||
var handle = new PollSubscriptionHandle(id);
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||
_subscriptions[id] = state;
|
||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||
return handle;
|
||||
}
|
||||
|
||||
/// <summary>Cancel the background loop for a handle returned by <see cref="Subscribe"/>.</summary>
|
||||
/// <returns><c>true</c> when the handle was known to the engine and has been torn down.</returns>
|
||||
public bool Unsubscribe(ISubscriptionHandle handle)
|
||||
{
|
||||
if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of active subscription count — exposed for driver diagnostics.</summary>
|
||||
public int ActiveSubscriptionCount => _subscriptions.Count;
|
||||
|
||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||
{
|
||||
// Initial-data push: every subscribed tag fires once at subscribe time regardless of
|
||||
// whether it has changed, satisfying OPC UA Part 4 initial-value semantics.
|
||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* first-read error tolerated — loop continues */ }
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
|
||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* transient poll error — loop continues, driver health surface logs it */ }
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||
{
|
||||
var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||
{
|
||||
var tagRef = state.TagReferences[i];
|
||||
var current = snapshots[i];
|
||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
|
||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||
{
|
||||
state.LastValues[tagRef] = current;
|
||||
_onChange(state.Handle, tagRef, current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Cancel every active subscription. Idempotent.</summary>
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed record SubscriptionState(
|
||||
PollSubscriptionHandle Handle,
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan Interval,
|
||||
CancellationTokenSource Cts)
|
||||
{
|
||||
public ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||
= new(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"poll-sub-{Id}";
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ public sealed class CapabilityInvoker
|
||||
private readonly string _driverInstanceId;
|
||||
private readonly string _driverType;
|
||||
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
||||
private readonly DriverResilienceStatusTracker? _statusTracker;
|
||||
|
||||
/// <summary>
|
||||
/// Construct an invoker for one driver instance.
|
||||
@@ -33,11 +34,13 @@ public sealed class CapabilityInvoker
|
||||
/// pipeline-invalidate can take effect without restarting the invoker.
|
||||
/// </param>
|
||||
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
||||
/// <param name="statusTracker">Optional resilience-status tracker. When wired, every capability call records start/complete so Admin <c>/hosts</c> can surface <see cref="ResilienceStatusSnapshot.CurrentInFlight"/> as the bulkhead-depth proxy.</param>
|
||||
public CapabilityInvoker(
|
||||
DriverResiliencePipelineBuilder builder,
|
||||
string driverInstanceId,
|
||||
Func<DriverResilienceOptions> optionsAccessor,
|
||||
string driverType = "Unknown")
|
||||
string driverType = "Unknown",
|
||||
DriverResilienceStatusTracker? statusTracker = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(builder);
|
||||
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
||||
@@ -46,6 +49,7 @@ public sealed class CapabilityInvoker
|
||||
_driverInstanceId = driverInstanceId;
|
||||
_driverType = driverType;
|
||||
_optionsAccessor = optionsAccessor;
|
||||
_statusTracker = statusTracker;
|
||||
}
|
||||
|
||||
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
||||
@@ -59,9 +63,17 @@ public sealed class CapabilityInvoker
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||
try
|
||||
{
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
{
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,9 +87,17 @@ public sealed class CapabilityInvoker
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||
try
|
||||
{
|
||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
{
|
||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Record the entry of a capability call for this (instance, host). Increments the
|
||||
/// in-flight counter used as the <see cref="ResilienceStatusSnapshot.CurrentInFlight"/>
|
||||
/// surface (a cheap stand-in for Polly bulkhead depth). Paired with
|
||||
/// <see cref="RecordCallComplete"/>; callers use try/finally.
|
||||
/// </summary>
|
||||
public void RecordCallStart(string driverInstanceId, string hostName)
|
||||
{
|
||||
var key = new StatusKey(driverInstanceId, hostName);
|
||||
_status.AddOrUpdate(key,
|
||||
_ => new ResilienceStatusSnapshot { CurrentInFlight = 1 },
|
||||
(_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 });
|
||||
}
|
||||
|
||||
/// <summary>Paired with <see cref="RecordCallStart"/> — decrements the in-flight counter.</summary>
|
||||
public void RecordCallComplete(string driverInstanceId, string hostName)
|
||||
{
|
||||
var key = new StatusKey(driverInstanceId, hostName);
|
||||
_status.AddOrUpdate(key,
|
||||
_ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0
|
||||
(_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) });
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
||||
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
||||
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
||||
@@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot
|
||||
public long BaselineFootprintBytes { get; init; }
|
||||
public long CurrentFootprintBytes { get; init; }
|
||||
public DateTime LastSampledUtc { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// In-flight capability calls against this (instance, host). Bumped on call entry +
|
||||
/// decremented on completion. Feeds <c>DriverInstanceResilienceStatus.CurrentBulkheadDepth</c>
|
||||
/// for Admin <c>/hosts</c> — a cheap proxy for the Polly bulkhead depth until the full
|
||||
/// telemetry observer lands.
|
||||
/// </summary>
|
||||
public int CurrentInFlight { get; init; }
|
||||
}
|
||||
|
||||
@@ -11,19 +11,17 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||
/// <c>IReadable</c>/<c>IWritable</c> abstractions generalize beyond Galaxy.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model;
|
||||
/// subscriptions would need a polling loop over the declared tags — additive PR). Historian
|
||||
/// + alarm capabilities are out of scope (the protocol doesn't express them).
|
||||
/// Scope limits: Historian + alarm capabilities are out of scope (the protocol doesn't
|
||||
/// express them). Subscriptions overlay a polling loop via the shared
|
||||
/// <see cref="PollGroupEngine"/> since Modbus has no native push model.
|
||||
/// </remarks>
|
||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||
public sealed class ModbusDriver
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
||||
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
||||
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||
private long _nextSubscriptionId;
|
||||
// Polled subscriptions delegate to the shared PollGroupEngine. The driver only supplies
|
||||
// the reader + on-change bridge; the engine owns the loop, interval floor, and lifecycle.
|
||||
private readonly PollGroupEngine _poll;
|
||||
private readonly string _driverInstanceId;
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||
@@ -35,15 +33,28 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
private HostState _hostState = HostState.Unknown;
|
||||
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||
private CancellationTokenSource? _probeCts;
|
||||
private readonly ModbusDriverOptions _options = options;
|
||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
|
||||
private readonly ModbusDriverOptions _options;
|
||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory;
|
||||
|
||||
private IModbusTransport? _transport;
|
||||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||||
private readonly Dictionary<string, ModbusTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public string DriverInstanceId => driverInstanceId;
|
||||
public ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
_options = options;
|
||||
_driverInstanceId = driverInstanceId;
|
||||
_transportFactory = transportFactory
|
||||
?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
|
||||
_poll = new PollGroupEngine(
|
||||
reader: ReadAsync,
|
||||
onChange: (handle, tagRef, snapshot) =>
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
|
||||
}
|
||||
|
||||
public string DriverInstanceId => _driverInstanceId;
|
||||
public string DriverType => "Modbus";
|
||||
|
||||
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||
@@ -84,12 +95,7 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
_probeCts?.Dispose();
|
||||
_probeCts = null;
|
||||
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
await _poll.DisposeAsync().ConfigureAwait(false);
|
||||
|
||||
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
||||
_transport = null;
|
||||
@@ -303,85 +309,18 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
}
|
||||
}
|
||||
|
||||
// ---- ISubscribable (polling overlay) ----
|
||||
// ---- ISubscribable (polling overlay via shared engine) ----
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
var cts = new CancellationTokenSource();
|
||||
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
||||
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
|
||||
: publishingInterval;
|
||||
var handle = new ModbusSubscriptionHandle(id);
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||
_subscriptions[id] = state;
|
||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||
{
|
||||
state.Cts.Cancel();
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
_poll.Unsubscribe(handle);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||
{
|
||||
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
|
||||
// current value per Part 4 convention, even if the value never changes thereafter.
|
||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* first-read error — polling continues */ }
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
|
||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||
{
|
||||
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||
{
|
||||
var tagRef = state.TagReferences[i];
|
||||
var current = snapshots[i];
|
||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
|
||||
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
|
||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||
{
|
||||
state.LastValues[tagRef] = current;
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record SubscriptionState(
|
||||
ModbusSubscriptionHandle Handle,
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan Interval,
|
||||
CancellationTokenSource Cts)
|
||||
{
|
||||
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||
= new(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"modbus-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||
|
||||
@@ -108,7 +108,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
||||
HostName = hostName,
|
||||
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
|
||||
ConsecutiveFailures = counters.ConsecutiveFailures,
|
||||
CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet
|
||||
CurrentBulkheadDepth = counters.CurrentInFlight,
|
||||
LastRecycleUtc = counters.LastRecycleUtc,
|
||||
BaselineFootprintBytes = counters.BaselineFootprintBytes,
|
||||
CurrentFootprintBytes = counters.CurrentFootprintBytes,
|
||||
@@ -119,6 +119,7 @@ public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
||||
{
|
||||
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
|
||||
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
|
||||
existing.CurrentBulkheadDepth = counters.CurrentInFlight;
|
||||
existing.LastRecycleUtc = counters.LastRecycleUtc;
|
||||
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
|
||||
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
|
||||
|
||||
@@ -0,0 +1,245 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class PollGroupEngineTests
|
||||
{
|
||||
private sealed class FakeSource
|
||||
{
|
||||
public ConcurrentDictionary<string, object?> Values { get; } = new();
|
||||
public int ReadCount;
|
||||
|
||||
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||||
IReadOnlyList<string> refs, CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref ReadCount);
|
||||
var now = DateTime.UtcNow;
|
||||
IReadOnlyList<DataValueSnapshot> snapshots = refs
|
||||
.Select(r => Values.TryGetValue(r, out var v)
|
||||
? new DataValueSnapshot(v, 0u, now, now)
|
||||
: new DataValueSnapshot(null, 0x80340000u, null, now))
|
||||
.ToList();
|
||||
return Task.FromResult(snapshots);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Initial_poll_force_raises_every_subscribed_tag()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["A"] = 1;
|
||||
src.Values["B"] = "hello";
|
||||
|
||||
var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue((h, r, s)));
|
||||
|
||||
var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200));
|
||||
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true);
|
||||
engine.Unsubscribe(handle).ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unchanged_value_raises_only_once()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 42;
|
||||
|
||||
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue((h, r, s)));
|
||||
|
||||
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
await Task.Delay(500);
|
||||
engine.Unsubscribe(handle);
|
||||
|
||||
events.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Value_change_raises_new_event()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 1;
|
||||
|
||||
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue((h, r, s)));
|
||||
|
||||
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||
src.Values["X"] = 2;
|
||||
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
engine.Unsubscribe(handle);
|
||||
events.Last().Item3.Value.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_halts_the_loop()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 1;
|
||||
|
||||
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue((h, r, s)));
|
||||
|
||||
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||
engine.Unsubscribe(handle).ShouldBeTrue();
|
||||
var afterUnsub = events.Count;
|
||||
|
||||
src.Values["X"] = 999;
|
||||
await Task.Delay(400);
|
||||
events.Count.ShouldBe(afterUnsub);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Interval_below_floor_is_clamped()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 1;
|
||||
|
||||
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue((h, r, s)),
|
||||
minInterval: TimeSpan.FromMilliseconds(200));
|
||||
|
||||
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5));
|
||||
await Task.Delay(300);
|
||||
engine.Unsubscribe(handle);
|
||||
|
||||
// 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll.
|
||||
// With zero changes only the initial-data push fires.
|
||||
events.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_subscriptions_are_independent()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["A"] = 1;
|
||||
src.Values["B"] = 2;
|
||||
|
||||
var a = new ConcurrentQueue<string>();
|
||||
var b = new ConcurrentQueue<string>();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) =>
|
||||
{
|
||||
if (r == "A") a.Enqueue(r);
|
||||
else if (r == "B") b.Enqueue(r);
|
||||
});
|
||||
|
||||
var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100));
|
||||
var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100));
|
||||
|
||||
await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
engine.Unsubscribe(ha);
|
||||
var aCount = a.Count;
|
||||
src.Values["B"] = 77;
|
||||
await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
a.Count.ShouldBe(aCount);
|
||||
b.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||
engine.Unsubscribe(hb);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reader_exception_does_not_crash_loop()
|
||||
{
|
||||
var throwCount = 0;
|
||||
var readCount = 0;
|
||||
Task<IReadOnlyList<DataValueSnapshot>> Reader(IReadOnlyList<string> refs, CancellationToken ct)
|
||||
{
|
||||
if (Interlocked.Increment(ref readCount) <= 2)
|
||||
{
|
||||
Interlocked.Increment(ref throwCount);
|
||||
throw new InvalidOperationException("boom");
|
||||
}
|
||||
var now = DateTime.UtcNow;
|
||||
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(
|
||||
refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList());
|
||||
}
|
||||
|
||||
var events = new ConcurrentQueue<string>();
|
||||
await using var engine = new PollGroupEngine(Reader,
|
||||
(h, r, s) => events.Enqueue(r));
|
||||
|
||||
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
engine.Unsubscribe(handle);
|
||||
|
||||
throwCount.ShouldBe(2);
|
||||
events.Count.ShouldBeGreaterThanOrEqualTo(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_unknown_handle_returns_false()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
||||
|
||||
var foreign = new DummyHandle();
|
||||
engine.Unsubscribe(foreign).ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ActiveSubscriptionCount_tracks_lifecycle()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 1;
|
||||
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
||||
|
||||
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||
var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
||||
var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
||||
engine.ActiveSubscriptionCount.ShouldBe(2);
|
||||
|
||||
engine.Unsubscribe(h1);
|
||||
engine.ActiveSubscriptionCount.ShouldBe(1);
|
||||
engine.Unsubscribe(h2);
|
||||
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeAsync_cancels_all_subscriptions()
|
||||
{
|
||||
var src = new FakeSource();
|
||||
src.Values["X"] = 1;
|
||||
|
||||
var events = new ConcurrentQueue<string>();
|
||||
var engine = new PollGroupEngine(src.ReadAsync,
|
||||
(h, r, s) => events.Enqueue(r));
|
||||
|
||||
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
await engine.DisposeAsync();
|
||||
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||
|
||||
var afterDispose = events.Count;
|
||||
await Task.Delay(300);
|
||||
// After dispose no more events — everything is cancelled.
|
||||
events.Count.ShouldBe(afterDispose);
|
||||
}
|
||||
|
||||
private sealed record DummyHandle : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => "dummy";
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (!condition() && DateTime.UtcNow < deadline)
|
||||
await Task.Delay(20);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class InFlightCounterTests
|
||||
{
|
||||
[Fact]
|
||||
public void StartThenComplete_NetsToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NestedStarts_SumDepth()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
|
||||
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompleteBeforeStart_ClampedToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
// A stray Complete without a matching Start shouldn't drive the counter negative.
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DifferentHosts_TrackIndependently()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-b");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConcurrentStarts_DoNotLose_Count()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
driverType: "Modbus",
|
||||
statusTracker: tracker);
|
||||
|
||||
var observedMidCall = -1;
|
||||
await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"plc-1",
|
||||
async _ =>
|
||||
{
|
||||
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
|
||||
await Task.Yield();
|
||||
return 42;
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: tracker);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteAsync<int>(
|
||||
DriverCapability.Write,
|
||||
"plc-1",
|
||||
_ => throw new InvalidOperationException("boom"),
|
||||
CancellationToken.None));
|
||||
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
|
||||
"finally-block must decrement even when call-site throws");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
|
||||
{
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: null);
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read, "host-1",
|
||||
_ => ValueTask.FromResult(7),
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(7);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user