review(Client.Shared): fix Disconnect/failover subscription race + CT forwarding

Re-review at 7286d320. -012 (Medium): DisconnectAsync now snapshots+nulls the data/alarm
subscriptions under _subscriptionLock before async teardown (was racing RunFailoverAsync).
-013: SubscribeAlarmsAsync guarded by a semaphore (idempotent under concurrency). -014/-015:
forward CancellationToken through Delete/BrowseNext adapters. + TDD.
This commit is contained in:
Joseph Doherty
2026-06-19 11:58:15 -04:00
parent 887a31e825
commit d68c9db9f9
6 changed files with 240 additions and 28 deletions
@@ -108,7 +108,9 @@ internal sealed class DefaultSessionAdapter : ISessionAdapter
public async Task<(byte[]? ContinuationPoint, ReferenceDescriptionCollection References)> BrowseNextAsync(
byte[] continuationPoint, CancellationToken ct)
{
var (_, nextCp, nextRefs) = await _session.BrowseNextAsync(null, false, continuationPoint);
// Pass the caller's token so a cancelled browse does not block on the continuation
// round-trip (Client.Shared-015).
var (_, nextCp, nextRefs) = await _session.BrowseNextAsync(null, false, continuationPoint, ct);
return (nextCp, nextRefs ?? []);
}
@@ -105,7 +105,8 @@ internal sealed class DefaultSubscriptionAdapter : ISubscriptionAdapter
{
try
{
await _subscription.DeleteAsync(true);
// Forward the caller's token so the delete can be cancelled (Client.Shared-014).
await _subscription.DeleteAsync(true, ct);
}
catch (Exception ex)
{
@@ -21,6 +21,12 @@ public sealed class OpcUaClientService : IOpcUaClientService
// path, and DisconnectAsync, so every read/write must be inside this lock.
private readonly object _subscriptionLock = new();
// Serialises SubscribeAlarmsAsync / UnsubscribeAlarmsAsync so concurrent callers
// cannot both pass the _alarmSubscription == null check and create duplicate event
// subscriptions. Capacity 1 = at most one waiter; async-friendly (no thread-block).
// (Client.Shared-013)
private readonly SemaphoreSlim _alarmSubscribeSemaphore = new(1, 1);
// Track active data subscriptions for replay after failover
private readonly Dictionary<string, (NodeId NodeId, int IntervalMs, uint Handle)> _activeDataSubscriptions = new();
@@ -130,19 +136,26 @@ public sealed class OpcUaClientService : IOpcUaClientService
var endpointUrl = _session?.EndpointUrl ?? _settings?.EndpointUrl ?? string.Empty;
// Snapshot the subscription adapter references under the lock so a concurrent
// RunFailoverAsync that nulls _dataSubscription / _alarmSubscription (Client.Shared-012)
// cannot invalidate the references between the null-check and the DeleteAsync call.
ISubscriptionAdapter? dataSubscription;
ISubscriptionAdapter? alarmSubscription;
lock (_subscriptionLock)
{
dataSubscription = _dataSubscription;
alarmSubscription = _alarmSubscription;
_dataSubscription = null;
_alarmSubscription = null;
}
try
{
if (_dataSubscription != null)
{
await _dataSubscription.DeleteAsync(ct);
_dataSubscription = null;
}
if (dataSubscription != null)
await dataSubscription.DeleteAsync(ct);
if (_alarmSubscription != null)
{
await _alarmSubscription.DeleteAsync(ct);
_alarmSubscription = null;
}
if (alarmSubscription != null)
await alarmSubscription.DeleteAsync(ct);
if (_session != null)
{
@@ -290,22 +303,33 @@ public sealed class OpcUaClientService : IOpcUaClientService
ThrowIfDisposed();
ThrowIfNotConnected();
if (_alarmSubscription != null)
return; // Already subscribed to alarms
var monitorNode = sourceNodeId ?? ObjectIds.Server;
_alarmSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct);
var filter = CreateAlarmEventFilter();
await _alarmSubscription.AddEventMonitoredItemAsync(
monitorNode, intervalMs, filter, OnAlarmEventNotification, ct);
lock (_subscriptionLock)
// Serialise check-and-create under the semaphore (Client.Shared-013): concurrent
// callers cannot both pass the null-check and create duplicate alarm subscriptions.
// The semaphore is async-friendly; the subscription I/O runs inside the critical section.
await _alarmSubscribeSemaphore.WaitAsync(ct);
try
{
_activeAlarmSubscription = (sourceNodeId, intervalMs);
}
if (_alarmSubscription != null)
return; // Already subscribed to alarms
Logger.Debug("Subscribed to alarm events on {NodeId}", monitorNode);
var monitorNode = sourceNodeId ?? ObjectIds.Server;
_alarmSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct);
var filter = CreateAlarmEventFilter();
await _alarmSubscription.AddEventMonitoredItemAsync(
monitorNode, intervalMs, filter, OnAlarmEventNotification, ct);
lock (_subscriptionLock)
{
_activeAlarmSubscription = (sourceNodeId, intervalMs);
}
Logger.Debug("Subscribed to alarm events on {NodeId}", monitorNode);
}
finally
{
_alarmSubscribeSemaphore.Release();
}
}
/// <inheritdoc />
@@ -584,6 +608,7 @@ public sealed class OpcUaClientService : IOpcUaClientService
_dataSubscription?.Dispose();
_alarmSubscription?.Dispose();
_session?.Dispose();
_alarmSubscribeSemaphore.Dispose();
lock (_subscriptionLock)
{