From e221371a0c7b1bff3f8d475bec79bf246a25d1f8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 22 May 2026 06:20:40 -0400 Subject: [PATCH] fix(client-shared): resolve High code-review findings (Client.Shared-005, Client.Shared-006) Client.Shared-005: _activeDataSubscriptions (a plain Dictionary) and the _activeAlarmSubscription tuple were mutated from the caller thread, the keep-alive failover path, and DisconnectAsync with no synchronization, risking bucket corrosion / InvalidOperationException / lost entries. Added a dedicated _subscriptionLock and wrapped every read/write of that bookkeeping state inside it (Subscribe/Unsubscribe[Alarms]Async, Disconnect, Dispose, and the snapshot/clear/re-record steps of ReplaySubscriptionsAsync). Awaited adapter calls stay outside the lock so it is never held across I/O. Client.Shared-006: HandleKeepAliveFailureAsync had only a non-atomic state check guarding re-entry, so two bad keep-alives could each start a failover loop, racing to dispose/replace _session and double-replaying subscriptions. It now claims an atomic _failoverInProgress slot via Interlocked.CompareExchange; a re-entrant call returns immediately. The loop body moved to RunFailoverAsync, wrapped in try/finally that resets the flag. Tests: added KeepAliveFailure_ReentrantWhileFailoverInFlight_RunsFailoverOnce and SubscribeAndUnsubscribe_ConcurrentCalls_DoNotCorruptState regression tests; made the FakeSubscriptionAdapter / FakeSessionAdapter / FakeSessionFactory test doubles thread-safe (and added a CreateGate hook) so the concurrency tests exercise production locking rather than fake state. All 138 Client.Shared tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Client.Shared/findings.md | 10 +- .../OpcUaClientService.cs | 146 ++++++++++++++---- .../Fakes/FakeSessionAdapter.cs | 11 +- .../Fakes/FakeSessionFactory.cs | 13 +- .../Fakes/FakeSubscriptionAdapter.cs | 63 ++++++-- .../OpcUaClientServiceTests.cs | 66 ++++++++ 6 files changed, 248 insertions(+), 61 deletions(-) diff --git a/code-reviews/Client.Shared/findings.md b/code-reviews/Client.Shared/findings.md index 733c735..7444304 100644 --- a/code-reviews/Client.Shared/findings.md +++ b/code-reviews/Client.Shared/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 11 | +| Open findings | 9 | ## Checklist coverage @@ -93,13 +93,13 @@ | Severity | High | | Category | Concurrency & thread safety | | Location | `OpcUaClientService.cs:19`, `OpcUaClientService.cs:226-249`, `OpcUaClientService.cs:499-521` | -| Status | Open | +| Status | Resolved | **Description:** `_activeDataSubscriptions` is a plain `Dictionary` mutated from at least three thread contexts with no synchronization: the caller thread (`SubscribeAsync`/`UnsubscribeAsync`), the keep-alive callback thread (`HandleKeepAliveFailureAsync` -> `ReplaySubscriptionsAsync`, invoked fire-and-forget from the OPC UA `KeepAlive` event), and `DisconnectAsync`. Concurrent `Add`/`Remove`/`Clear`/enumeration on a non-thread-safe `Dictionary` can corrupt its internal buckets, throw `InvalidOperationException`, or lose entries. A failover firing while the UI calls `SubscribeAsync` is a realistic trigger. The `_activeAlarmSubscription` nullable tuple has the same exposure. **Recommendation:** Guard all access to `_activeDataSubscriptions` / `_activeAlarmSubscription` (and the `_session`/`_dataSubscription`/`_alarmSubscription` fields) with a single lock, or move subscription bookkeeping behind a `ConcurrentDictionary` plus a lock for the multi-field failover transition. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — added a dedicated `_subscriptionLock` and wrapped every read/write of `_activeDataSubscriptions` and `_activeAlarmSubscription` (in Subscribe/Unsubscribe[Alarms]Async, Disconnect, Dispose, and the snapshot/clear/re-record steps of ReplaySubscriptionsAsync) inside it; awaited adapter calls run outside the lock to avoid holding it across I/O. ### Client.Shared-006 @@ -108,13 +108,13 @@ | Severity | High | | Category | Concurrency & thread safety | | Location | `OpcUaClientService.cs:97-100`, `OpcUaClientService.cs:432-497` | -| Status | Open | +| Status | Resolved | **Description:** `HandleKeepAliveFailureAsync` is launched fire-and-forget (`_ = HandleKeepAliveFailureAsync()`) from every bad keep-alive callback. The only guard against re-entry is the non-atomic check `if (_state == Reconnecting || _state == Disconnected) return;` at the top. Between that read and the `TransitionState(Reconnecting, ...)` write a few lines later, a second keep-alive failure (the SDK raises `KeepAlive` repeatedly while a session is down) can pass the same guard, and two failover loops run concurrently — each disposing `_session`, nulling subscription fields, and racing to assign a new `_session`. The session created by the loser leaks, and `ReplaySubscriptionsAsync` can run twice creating duplicate monitored items. **Recommendation:** Serialize failover with an `Interlocked.CompareExchange` flag or a `SemaphoreSlim(1,1)` so only one failover loop runs at a time; subsequent keep-alive failures during an in-flight failover should be ignored. Make the state transition atomic with the re-entry guard. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — `HandleKeepAliveFailureAsync` now claims an atomic `_failoverInProgress` slot via `Interlocked.CompareExchange(ref _failoverInProgress, 1, 0)`; a re-entrant bad keep-alive sees `1` and returns immediately, so only one failover loop runs. The loop body moved to `RunFailoverAsync`, wrapped in try/finally that resets the flag with `Interlocked.Exchange`. ### Client.Shared-007 diff --git a/src/Client/ZB.MOM.WW.OtOpcUa.Client.Shared/OpcUaClientService.cs b/src/Client/ZB.MOM.WW.OtOpcUa.Client.Shared/OpcUaClientService.cs index 2659f5e..ee019ad 100644 --- a/src/Client/ZB.MOM.WW.OtOpcUa.Client.Shared/OpcUaClientService.cs +++ b/src/Client/ZB.MOM.WW.OtOpcUa.Client.Shared/OpcUaClientService.cs @@ -15,9 +15,20 @@ public sealed class OpcUaClientService : IOpcUaClientService { private static readonly ILogger Logger = Log.ForContext(); + // Guards all access to the subscription-bookkeeping state below + // (_activeDataSubscriptions and _activeAlarmSubscription). The dictionary + // and tuple are mutated from the caller thread, the keep-alive failover + // path, and DisconnectAsync, so every read/write must be inside this lock. + private readonly object _subscriptionLock = new(); + // Track active data subscriptions for replay after failover private readonly Dictionary _activeDataSubscriptions = new(); + // Re-entry guard for HandleKeepAliveFailureAsync. The OPC UA stack raises + // KeepAlive repeatedly while a session is down; only one failover loop may + // run at a time. 0 = idle, 1 = failover in progress (Interlocked-managed). + private int _failoverInProgress; + private readonly IApplicationConfigurationFactory _configFactory; private readonly IEndpointDiscovery _endpointDiscovery; @@ -146,8 +157,12 @@ public sealed class OpcUaClientService : IOpcUaClientService } finally { - _activeDataSubscriptions.Clear(); - _activeAlarmSubscription = null; + lock (_subscriptionLock) + { + _activeDataSubscriptions.Clear(); + _activeAlarmSubscription = null; + } + CurrentConnectionInfo = null; TransitionState(ConnectionState.Disconnected, endpointUrl); } @@ -223,15 +238,22 @@ public sealed class OpcUaClientService : IOpcUaClientService ThrowIfNotConnected(); var nodeIdStr = nodeId.ToString(); - if (_activeDataSubscriptions.ContainsKey(nodeIdStr)) - return; // Already subscribed + lock (_subscriptionLock) + { + if (_activeDataSubscriptions.ContainsKey(nodeIdStr)) + return; // Already subscribed + } if (_dataSubscription == null) _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct); var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync( nodeId, intervalMs, OnDataChangeNotification, ct); - _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); + lock (_subscriptionLock) + { + _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); + } + Logger.Debug("Subscribed to data changes on {NodeId}", nodeId); } @@ -241,12 +263,20 @@ public sealed class OpcUaClientService : IOpcUaClientService ThrowIfDisposed(); var nodeIdStr = nodeId.ToString(); - if (!_activeDataSubscriptions.TryGetValue(nodeIdStr, out var sub)) - return; // Not subscribed, safe to ignore + (NodeId NodeId, int IntervalMs, uint Handle) sub; + lock (_subscriptionLock) + { + if (!_activeDataSubscriptions.TryGetValue(nodeIdStr, out sub)) + return; // Not subscribed, safe to ignore + } if (_dataSubscription != null) await _dataSubscription.RemoveMonitoredItemAsync(sub.Handle, ct); - _activeDataSubscriptions.Remove(nodeIdStr); + lock (_subscriptionLock) + { + _activeDataSubscriptions.Remove(nodeIdStr); + } + Logger.Debug("Unsubscribed from data changes on {NodeId}", nodeId); } @@ -267,7 +297,11 @@ public sealed class OpcUaClientService : IOpcUaClientService await _alarmSubscription.AddEventMonitoredItemAsync( monitorNode, intervalMs, filter, OnAlarmEventNotification, ct); - _activeAlarmSubscription = (sourceNodeId, intervalMs); + lock (_subscriptionLock) + { + _activeAlarmSubscription = (sourceNodeId, intervalMs); + } + Logger.Debug("Subscribed to alarm events on {NodeId}", monitorNode); } @@ -281,7 +315,12 @@ public sealed class OpcUaClientService : IOpcUaClientService await _alarmSubscription.DeleteAsync(ct); _alarmSubscription = null; - _activeAlarmSubscription = null; + + lock (_subscriptionLock) + { + _activeAlarmSubscription = null; + } + Logger.Debug("Unsubscribed from alarm events"); } @@ -393,8 +432,13 @@ public sealed class OpcUaClientService : IOpcUaClientService _dataSubscription?.Dispose(); _alarmSubscription?.Dispose(); _session?.Dispose(); - _activeDataSubscriptions.Clear(); - _activeAlarmSubscription = null; + + lock (_subscriptionLock) + { + _activeDataSubscriptions.Clear(); + _activeAlarmSubscription = null; + } + CurrentConnectionInfo = null; _state = ConnectionState.Disconnected; } @@ -430,6 +474,26 @@ public sealed class OpcUaClientService : IOpcUaClientService } private async Task HandleKeepAliveFailureAsync() + { + // Serialize failover: the OPC UA stack raises KeepAlive repeatedly + // while a session is down, so multiple bad keep-alives can fire before + // the first failover loop finishes. CompareExchange atomically claims + // the failover slot; a re-entrant call sees 1 and returns immediately, + // guaranteeing exactly one failover loop runs at a time. + if (Interlocked.CompareExchange(ref _failoverInProgress, 1, 0) != 0) + return; + + try + { + await RunFailoverAsync(); + } + finally + { + Interlocked.Exchange(ref _failoverInProgress, 0); + } + } + + private async Task RunFailoverAsync() { if (_state == ConnectionState.Reconnecting || _state == ConnectionState.Disconnected) return; @@ -498,33 +562,43 @@ public sealed class OpcUaClientService : IOpcUaClientService private async Task ReplaySubscriptionsAsync() { - // Replay data subscriptions - if (_activeDataSubscriptions.Count > 0) + // Snapshot the bookkeeping state under the lock, then clear it so the + // replayed handles can be recorded fresh as each monitored item is + // re-created. Awaited calls run outside the lock. + List> subscriptions; + (NodeId? SourceNodeId, int IntervalMs)? alarmSubscription; + lock (_subscriptionLock) { - var subscriptions = _activeDataSubscriptions.ToList(); + subscriptions = _activeDataSubscriptions.ToList(); + alarmSubscription = _activeAlarmSubscription; _activeDataSubscriptions.Clear(); - - foreach (var (nodeIdStr, (nodeId, intervalMs, _)) in subscriptions) - try - { - if (_dataSubscription == null) - _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None); - - var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync( - nodeId, intervalMs, OnDataChangeNotification, CancellationToken.None); - _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); - } - catch (Exception ex) - { - Logger.Warning(ex, "Failed to replay data subscription for {NodeId}", nodeIdStr); - } + _activeAlarmSubscription = null; } + // Replay data subscriptions + foreach (var (nodeIdStr, (nodeId, intervalMs, _)) in subscriptions) + try + { + if (_dataSubscription == null) + _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None); + + var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync( + nodeId, intervalMs, OnDataChangeNotification, CancellationToken.None); + + lock (_subscriptionLock) + { + _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); + } + } + catch (Exception ex) + { + Logger.Warning(ex, "Failed to replay data subscription for {NodeId}", nodeIdStr); + } + // Replay alarm subscription - if (_activeAlarmSubscription.HasValue) + if (alarmSubscription.HasValue) { - var (sourceNodeId, intervalMs) = _activeAlarmSubscription.Value; - _activeAlarmSubscription = null; + var (sourceNodeId, intervalMs) = alarmSubscription.Value; try { var monitorNode = sourceNodeId ?? ObjectIds.Server; @@ -532,7 +606,11 @@ public sealed class OpcUaClientService : IOpcUaClientService var filter = CreateAlarmEventFilter(); await _alarmSubscription.AddEventMonitoredItemAsync( monitorNode, intervalMs, filter, OnAlarmEventNotification, CancellationToken.None); - _activeAlarmSubscription = (sourceNodeId, intervalMs); + + lock (_subscriptionLock) + { + _activeAlarmSubscription = (sourceNodeId, intervalMs); + } } catch (Exception ex) { diff --git a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionAdapter.cs b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionAdapter.cs index 11607a1..e9928cf 100644 --- a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionAdapter.cs +++ b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionAdapter.cs @@ -159,10 +159,13 @@ internal sealed class FakeSessionAdapter : ISessionAdapter /// public Task CreateSubscriptionAsync(int publishingIntervalMs, CancellationToken ct) { - var sub = NextSubscription ?? new FakeSubscriptionAdapter(); - NextSubscription = null; - _createdSubscriptions.Add(sub); - return Task.FromResult(sub); + lock (_createdSubscriptions) + { + var sub = NextSubscription ?? new FakeSubscriptionAdapter(); + NextSubscription = null; + _createdSubscriptions.Add(sub); + return Task.FromResult(sub); + } } /// diff --git a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionFactory.cs b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionFactory.cs index 98cdbd0..1a89585 100644 --- a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionFactory.cs +++ b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSessionFactory.cs @@ -12,15 +12,24 @@ internal sealed class FakeSessionFactory : ISessionFactory public bool ThrowOnCreate { get; set; } public string? LastEndpointUrl { get; private set; } + /// + /// Optional gate that, when set, blocks until completed. + /// Lets tests hold a failover loop in-flight to exercise re-entrancy. + /// + public TaskCompletionSource? CreateGate { get; set; } + public IReadOnlyList CreatedSessions => _createdSessions; - public Task CreateSessionAsync( + public async Task CreateSessionAsync( ApplicationConfiguration config, EndpointDescription endpoint, string sessionName, uint sessionTimeoutMs, UserIdentity identity, CancellationToken ct) { CreateCallCount++; LastEndpointUrl = endpoint.EndpointUrl; + if (CreateGate != null) + await CreateGate.Task; + if (ThrowOnCreate) throw new InvalidOperationException("FakeSessionFactory configured to fail."); @@ -39,7 +48,7 @@ internal sealed class FakeSessionFactory : ISessionFactory // Ensure endpoint URL matches session.EndpointUrl = endpoint.EndpointUrl; _createdSessions.Add(session); - return Task.FromResult(session); + return session; } /// diff --git a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSubscriptionAdapter.cs b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSubscriptionAdapter.cs index f41b36e..ddb9e9a 100644 --- a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSubscriptionAdapter.cs +++ b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/Fakes/FakeSubscriptionAdapter.cs @@ -12,6 +12,10 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter Dictionary? DataCallback, Action? EventCallback )> _items = new(); + // Guards _items so concurrent-subscription tests exercise the production + // locking rather than tripping over the test double's own state. + private readonly object _itemsLock = new(); + private uint _nextHandle = 100; /// /// Gets a value indicating whether the fake subscription has been deleted. @@ -34,7 +38,13 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter /// /// Gets the handles of all active items. /// - public IReadOnlyCollection ActiveHandles => _items.Keys.ToList(); + public IReadOnlyCollection ActiveHandles + { + get + { + lock (_itemsLock) return _items.Keys.ToList(); + } + } /// public uint SubscriptionId { get; set; } = 42; @@ -43,17 +53,24 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter public Task AddDataChangeMonitoredItemAsync(NodeId nodeId, int samplingIntervalMs, Action onDataChange, CancellationToken ct) { - AddDataChangeCount++; - var handle = _nextHandle++; - _items[handle] = (nodeId, onDataChange, null); - return Task.FromResult(handle); + lock (_itemsLock) + { + AddDataChangeCount++; + var handle = _nextHandle++; + _items[handle] = (nodeId, onDataChange, null); + return Task.FromResult(handle); + } } /// public Task RemoveMonitoredItemAsync(uint clientHandle, CancellationToken ct) { - RemoveCount++; - _items.Remove(clientHandle); + lock (_itemsLock) + { + RemoveCount++; + _items.Remove(clientHandle); + } + return Task.CompletedTask; } @@ -61,10 +78,13 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter public Task AddEventMonitoredItemAsync(NodeId nodeId, int samplingIntervalMs, EventFilter filter, Action onEvent, CancellationToken ct) { - AddEventCount++; - var handle = _nextHandle++; - _items[handle] = (nodeId, null, onEvent); - return Task.FromResult(handle); + lock (_itemsLock) + { + AddEventCount++; + var handle = _nextHandle++; + _items[handle] = (nodeId, null, onEvent); + return Task.FromResult(handle); + } } /// @@ -80,7 +100,7 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter public Task DeleteAsync(CancellationToken ct) { Deleted = true; - _items.Clear(); + lock (_itemsLock) _items.Clear(); return Task.CompletedTask; } @@ -89,7 +109,7 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter /// public void Dispose() { - _items.Clear(); + lock (_itemsLock) _items.Clear(); } /// @@ -97,8 +117,13 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter /// public void SimulateDataChange(uint handle, DataValue value) { - if (_items.TryGetValue(handle, out var item) && item.DataCallback != null) - item.DataCallback(item.NodeId.ToString(), value); + (NodeId NodeId, Action? DataCallback, Action? EventCallback) item; + lock (_itemsLock) + { + if (!_items.TryGetValue(handle, out item)) return; + } + + item.DataCallback?.Invoke(item.NodeId.ToString(), value); } /// @@ -106,6 +131,12 @@ internal sealed class FakeSubscriptionAdapter : ISubscriptionAdapter /// public void SimulateEvent(uint handle, EventFieldList eventFields) { - if (_items.TryGetValue(handle, out var item) && item.EventCallback != null) item.EventCallback(eventFields); + (NodeId NodeId, Action? DataCallback, Action? EventCallback) item; + lock (_itemsLock) + { + if (!_items.TryGetValue(handle, out item)) return; + } + + item.EventCallback?.Invoke(eventFields); } } diff --git a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/OpcUaClientServiceTests.cs b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/OpcUaClientServiceTests.cs index d9c9c22..9d2cf55 100644 --- a/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/OpcUaClientServiceTests.cs +++ b/tests/Client/ZB.MOM.WW.OtOpcUa.Client.Shared.Tests/OpcUaClientServiceTests.cs @@ -920,6 +920,72 @@ public class OpcUaClientServiceTests : IDisposable _service.IsConnected.ShouldBeFalse(); } + /// + /// Regression for Client.Shared-006: a re-entrant keep-alive failure that fires while a + /// failover loop is still in-flight must be ignored, so only one failover runs and only + /// one replacement session is created. + /// + [Fact] + public async Task KeepAliveFailure_ReentrantWhileFailoverInFlight_RunsFailoverOnce() + { + var session1 = new FakeSessionAdapter { EndpointUrl = "opc.tcp://primary:4840" }; + var session2 = new FakeSessionAdapter { EndpointUrl = "opc.tcp://backup:4840" }; + _sessionFactory.EnqueueSession(session1); + _sessionFactory.EnqueueSession(session2); + + var settings = ValidSettings("opc.tcp://primary:4840"); + settings.FailoverUrls = ["opc.tcp://backup:4840"]; + + await _service.ConnectAsync(settings); + var createCountAfterConnect = _sessionFactory.CreateCallCount; // 1 + + // Hold the failover's session creation open so it stays in-flight. + var gate = new TaskCompletionSource(); + _sessionFactory.CreateGate = gate; + + // First bad keep-alive starts the failover loop (now blocked on the gate). + session1.SimulateKeepAlive(false); + + // Re-entrant bad keep-alives while failover is still running must be ignored. + session1.SimulateKeepAlive(false); + session1.SimulateKeepAlive(false); + + // Release the gate so the in-flight failover completes. + gate.SetResult(); + await Task.Delay(200); + + // Exactly one extra session created by the single failover loop. + _sessionFactory.CreateCallCount.ShouldBe(createCountAfterConnect + 1); + _service.CurrentConnectionInfo!.EndpointUrl.ShouldBe("opc.tcp://backup:4840"); + } + + /// + /// Regression for Client.Shared-005: concurrent subscribe/unsubscribe calls mutating the + /// active-subscription bookkeeping must not corrupt the dictionary or throw. + /// + [Fact] + public async Task SubscribeAndUnsubscribe_ConcurrentCalls_DoNotCorruptState() + { + var fakeSub = new FakeSubscriptionAdapter(); + var session = new FakeSessionAdapter { NextSubscription = fakeSub }; + _sessionFactory.EnqueueSession(session); + await _service.ConnectAsync(ValidSettings()); + + var tasks = new List(); + for (var i = 0; i < 50; i++) + { + var nodeId = new NodeId($"ns=2;s=Node{i}"); + tasks.Add(Task.Run(async () => + { + await _service.SubscribeAsync(nodeId); + await _service.UnsubscribeAsync(nodeId); + })); + } + + // No InvalidOperationException from concurrent Dictionary mutation. + await Should.NotThrowAsync(() => Task.WhenAll(tasks)); + } + // --- Dispose tests --- ///