fix(client-shared): resolve High code-review findings (Client.Shared-005, Client.Shared-006)

Client.Shared-005: _activeDataSubscriptions (a plain Dictionary) and the
_activeAlarmSubscription tuple were mutated from the caller thread, the
keep-alive failover path, and DisconnectAsync with no synchronization,
risking bucket corrosion / InvalidOperationException / lost entries.
Added a dedicated _subscriptionLock and wrapped every read/write of that
bookkeeping state inside it (Subscribe/Unsubscribe[Alarms]Async,
Disconnect, Dispose, and the snapshot/clear/re-record steps of
ReplaySubscriptionsAsync). Awaited adapter calls stay outside the lock so
it is never held across I/O.

Client.Shared-006: HandleKeepAliveFailureAsync had only a non-atomic
state check guarding re-entry, so two bad keep-alives could each start a
failover loop, racing to dispose/replace _session and double-replaying
subscriptions. It now claims an atomic _failoverInProgress slot via
Interlocked.CompareExchange; a re-entrant call returns immediately. The
loop body moved to RunFailoverAsync, wrapped in try/finally that resets
the flag.

Tests: added KeepAliveFailure_ReentrantWhileFailoverInFlight_RunsFailoverOnce
and SubscribeAndUnsubscribe_ConcurrentCalls_DoNotCorruptState regression
tests; made the FakeSubscriptionAdapter / FakeSessionAdapter /
FakeSessionFactory test doubles thread-safe (and added a CreateGate hook)
so the concurrency tests exercise production locking rather than fake
state. All 138 Client.Shared tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-22 06:20:40 -04:00
parent 3de688f8d6
commit e221371a0c
6 changed files with 248 additions and 61 deletions

View File

@@ -15,9 +15,20 @@ public sealed class OpcUaClientService : IOpcUaClientService
{
private static readonly ILogger Logger = Log.ForContext<OpcUaClientService>();
// Guards all access to the subscription-bookkeeping state below
// (_activeDataSubscriptions and _activeAlarmSubscription). The dictionary
// and tuple are mutated from the caller thread, the keep-alive failover
// path, and DisconnectAsync, so every read/write must be inside this lock.
private readonly object _subscriptionLock = new();
// Track active data subscriptions for replay after failover
private readonly Dictionary<string, (NodeId NodeId, int IntervalMs, uint Handle)> _activeDataSubscriptions = new();
// Re-entry guard for HandleKeepAliveFailureAsync. The OPC UA stack raises
// KeepAlive repeatedly while a session is down; only one failover loop may
// run at a time. 0 = idle, 1 = failover in progress (Interlocked-managed).
private int _failoverInProgress;
private readonly IApplicationConfigurationFactory _configFactory;
private readonly IEndpointDiscovery _endpointDiscovery;
@@ -146,8 +157,12 @@ public sealed class OpcUaClientService : IOpcUaClientService
}
finally
{
_activeDataSubscriptions.Clear();
_activeAlarmSubscription = null;
lock (_subscriptionLock)
{
_activeDataSubscriptions.Clear();
_activeAlarmSubscription = null;
}
CurrentConnectionInfo = null;
TransitionState(ConnectionState.Disconnected, endpointUrl);
}
@@ -223,15 +238,22 @@ public sealed class OpcUaClientService : IOpcUaClientService
ThrowIfNotConnected();
var nodeIdStr = nodeId.ToString();
if (_activeDataSubscriptions.ContainsKey(nodeIdStr))
return; // Already subscribed
lock (_subscriptionLock)
{
if (_activeDataSubscriptions.ContainsKey(nodeIdStr))
return; // Already subscribed
}
if (_dataSubscription == null) _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct);
var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync(
nodeId, intervalMs, OnDataChangeNotification, ct);
_activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle);
lock (_subscriptionLock)
{
_activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle);
}
Logger.Debug("Subscribed to data changes on {NodeId}", nodeId);
}
@@ -241,12 +263,20 @@ public sealed class OpcUaClientService : IOpcUaClientService
ThrowIfDisposed();
var nodeIdStr = nodeId.ToString();
if (!_activeDataSubscriptions.TryGetValue(nodeIdStr, out var sub))
return; // Not subscribed, safe to ignore
(NodeId NodeId, int IntervalMs, uint Handle) sub;
lock (_subscriptionLock)
{
if (!_activeDataSubscriptions.TryGetValue(nodeIdStr, out sub))
return; // Not subscribed, safe to ignore
}
if (_dataSubscription != null) await _dataSubscription.RemoveMonitoredItemAsync(sub.Handle, ct);
_activeDataSubscriptions.Remove(nodeIdStr);
lock (_subscriptionLock)
{
_activeDataSubscriptions.Remove(nodeIdStr);
}
Logger.Debug("Unsubscribed from data changes on {NodeId}", nodeId);
}
@@ -267,7 +297,11 @@ public sealed class OpcUaClientService : IOpcUaClientService
await _alarmSubscription.AddEventMonitoredItemAsync(
monitorNode, intervalMs, filter, OnAlarmEventNotification, ct);
_activeAlarmSubscription = (sourceNodeId, intervalMs);
lock (_subscriptionLock)
{
_activeAlarmSubscription = (sourceNodeId, intervalMs);
}
Logger.Debug("Subscribed to alarm events on {NodeId}", monitorNode);
}
@@ -281,7 +315,12 @@ public sealed class OpcUaClientService : IOpcUaClientService
await _alarmSubscription.DeleteAsync(ct);
_alarmSubscription = null;
_activeAlarmSubscription = null;
lock (_subscriptionLock)
{
_activeAlarmSubscription = null;
}
Logger.Debug("Unsubscribed from alarm events");
}
@@ -393,8 +432,13 @@ public sealed class OpcUaClientService : IOpcUaClientService
_dataSubscription?.Dispose();
_alarmSubscription?.Dispose();
_session?.Dispose();
_activeDataSubscriptions.Clear();
_activeAlarmSubscription = null;
lock (_subscriptionLock)
{
_activeDataSubscriptions.Clear();
_activeAlarmSubscription = null;
}
CurrentConnectionInfo = null;
_state = ConnectionState.Disconnected;
}
@@ -430,6 +474,26 @@ public sealed class OpcUaClientService : IOpcUaClientService
}
private async Task HandleKeepAliveFailureAsync()
{
// Serialize failover: the OPC UA stack raises KeepAlive repeatedly
// while a session is down, so multiple bad keep-alives can fire before
// the first failover loop finishes. CompareExchange atomically claims
// the failover slot; a re-entrant call sees 1 and returns immediately,
// guaranteeing exactly one failover loop runs at a time.
if (Interlocked.CompareExchange(ref _failoverInProgress, 1, 0) != 0)
return;
try
{
await RunFailoverAsync();
}
finally
{
Interlocked.Exchange(ref _failoverInProgress, 0);
}
}
private async Task RunFailoverAsync()
{
if (_state == ConnectionState.Reconnecting || _state == ConnectionState.Disconnected)
return;
@@ -498,33 +562,43 @@ public sealed class OpcUaClientService : IOpcUaClientService
private async Task ReplaySubscriptionsAsync()
{
// Replay data subscriptions
if (_activeDataSubscriptions.Count > 0)
// Snapshot the bookkeeping state under the lock, then clear it so the
// replayed handles can be recorded fresh as each monitored item is
// re-created. Awaited calls run outside the lock.
List<KeyValuePair<string, (NodeId NodeId, int IntervalMs, uint Handle)>> subscriptions;
(NodeId? SourceNodeId, int IntervalMs)? alarmSubscription;
lock (_subscriptionLock)
{
var subscriptions = _activeDataSubscriptions.ToList();
subscriptions = _activeDataSubscriptions.ToList();
alarmSubscription = _activeAlarmSubscription;
_activeDataSubscriptions.Clear();
foreach (var (nodeIdStr, (nodeId, intervalMs, _)) in subscriptions)
try
{
if (_dataSubscription == null)
_dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None);
var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync(
nodeId, intervalMs, OnDataChangeNotification, CancellationToken.None);
_activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle);
}
catch (Exception ex)
{
Logger.Warning(ex, "Failed to replay data subscription for {NodeId}", nodeIdStr);
}
_activeAlarmSubscription = null;
}
// Replay data subscriptions
foreach (var (nodeIdStr, (nodeId, intervalMs, _)) in subscriptions)
try
{
if (_dataSubscription == null)
_dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None);
var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync(
nodeId, intervalMs, OnDataChangeNotification, CancellationToken.None);
lock (_subscriptionLock)
{
_activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle);
}
}
catch (Exception ex)
{
Logger.Warning(ex, "Failed to replay data subscription for {NodeId}", nodeIdStr);
}
// Replay alarm subscription
if (_activeAlarmSubscription.HasValue)
if (alarmSubscription.HasValue)
{
var (sourceNodeId, intervalMs) = _activeAlarmSubscription.Value;
_activeAlarmSubscription = null;
var (sourceNodeId, intervalMs) = alarmSubscription.Value;
try
{
var monitorNode = sourceNodeId ?? ObjectIds.Server;
@@ -532,7 +606,11 @@ public sealed class OpcUaClientService : IOpcUaClientService
var filter = CreateAlarmEventFilter();
await _alarmSubscription.AddEventMonitoredItemAsync(
monitorNode, intervalMs, filter, OnAlarmEventNotification, CancellationToken.None);
_activeAlarmSubscription = (sourceNodeId, intervalMs);
lock (_subscriptionLock)
{
_activeAlarmSubscription = (sourceNodeId, intervalMs);
}
}
catch (Exception ex)
{