fix(data-connection-layer): resolve DataConnectionLayer-008,013 — O(1) unsubscribe via reverse index, atomic disconnect guard

This commit is contained in:
Joseph Doherty
2026-05-16 22:14:23 -04:00
parent 7d1cc5cbb4
commit ff4a4bdeb7
6 changed files with 196 additions and 24 deletions

View File

@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 |
| Reviewer | claude-agent |
| Commit reviewed | `9c60592` |
| Open findings | 2 |
| Open findings | 0 |
## Summary
@@ -381,7 +381,7 @@ after.
|--|--|
| Severity | Low |
| Category | Performance & resource management |
| Status | Open |
| Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:540-569` |
**Description**
@@ -404,7 +404,26 @@ prior state captured before removal.
**Resolution**
_Unresolved._
Resolved 2026-05-16 (commit pending). A `tagPath → subscriber-count` reverse index
(`_tagSubscriberCount`) was added: `HandleSubscribeCompleted` increments it whenever a
tag is newly added to an instance's set, and `HandleUnsubscribe` decrements it,
releasing a tag at the adapter only when the count reaches zero. The "any other
subscriber" check is now O(1) per tag instead of an O(instances) `Where(...).Any()`
scan. The redundant `!_unresolvedTags.Contains(tagPath)` re-check (always true after
the unconditional `Remove` on the line above) was removed — the surviving branch is
entered only for tags that have a subscription id, which are by definition resolved,
so `_resolvedTags--` is now unconditional with an explanatory comment. The cleanup
also fixed a latent leak the original code could not reach: an unresolved tag whose
last subscriber unsubscribes is now removed from `_unresolvedTags`/`_resolutionInFlight`
and decremented from `_totalSubscribed` (previously it lingered in the retry timer and
the subscribed total forever). Regression test
`DCL008_Unsubscribe_OnlyReleasesTagWhenLastSubscriberLeaves` subscribes a tag from two
instances plus an exclusive tag, then unsubscribes each instance and asserts the
shared tag is released at the adapter only after the last subscriber leaves and the
health counters track correctly. (This finding is a performance refactor, not a
correctness bug — the pre-fix `Where(...).Any()` logic was functionally correct, so
the test passes against both versions and serves as a behavioural guard for the
refactor.)
### DataConnectionLayer-009 — Implemented failover heuristic diverges from the documented state machine
@@ -606,7 +625,7 @@ deliberately not made here because this task is scoped to
|--|--|
| Severity | Low |
| Category | Documentation & comments |
| Status | Open |
| Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:270-281` |
**Description**
@@ -629,4 +648,16 @@ under a race and is tolerated downstream."
**Resolution**
_Unresolved._
Resolved 2026-05-16 (commit pending). Rather than weaken the XML comment to match the
weak guard, the guard was made genuinely atomic so the documented "only the first
caller fires the event" guarantee becomes true. `OpcUaDataConnection._disconnectFired`
and `RealOpcUaClient._connectionLostFired` were changed from `volatile bool` to `int`,
and the check-then-set in `RaiseDisconnected` / `OnSessionKeepAlive` replaced with a
single `Interlocked.Exchange(ref flag, 1) != 0` compare-and-set; the reset on connect
uses `Interlocked.Exchange(ref flag, 0)`. The XML comments on both methods were updated
to describe the atomic compare-and-set explicitly. Regression test
`DCL013_ConcurrentConnectionLost_RaisesDisconnectedExactlyOnce` runs 25 rounds, each
fanning 32 barrier-synchronised threads that raise the client's `ConnectionLost` event
simultaneously, and asserts `Disconnected` fires exactly once per round; against a
non-atomic check-then-set it double-fires (verified by temporarily reverting the
guard), and it passes against the atomic fix.

View File

@@ -50,6 +50,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
/// </summary>
private readonly Dictionary<string, string> _subscriptionIds = new();
/// <summary>
/// DataConnectionLayer-008: reverse index of how many instances subscribe to each
/// tag path. Lets <see cref="HandleUnsubscribe"/> decide whether any other instance
/// still needs a tag in O(1) instead of scanning every instance's tag set.
/// </summary>
private readonly Dictionary<string, int> _tagSubscriberCount = new();
/// <summary>
/// Tags whose path resolution failed and are awaiting retry.
/// </summary>
@@ -600,7 +607,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
foreach (var result in msg.Results)
{
instanceTags.Add(result.TagPath);
// DataConnectionLayer-008: only a tag newly added to THIS instance's set
// increments the reference count, so the count stays an accurate "number
// of distinct instances subscribed to this tag".
if (instanceTags.Add(result.TagPath))
_tagSubscriberCount[result.TagPath] =
_tagSubscriberCount.GetValueOrDefault(result.TagPath) + 1;
// Re-check against current state: another subscribe may have resolved the
// same tag while this request's I/O was in flight.
@@ -687,20 +699,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// WP-14: Cleanup on Instance Actor stop
foreach (var tagPath in tags)
{
// Check if any other instance is still subscribed to this tag
var otherSubscribers = _subscriptionsByInstance
.Where(kvp => kvp.Key != request.InstanceUniqueName && kvp.Value.Contains(tagPath))
.Any();
// DataConnectionLayer-008: drop this instance's reference; the tag is only
// released at the adapter when no other instance still subscribes to it.
// The reference count makes this O(1) instead of an O(instances) scan.
var remaining = _tagSubscriberCount.GetValueOrDefault(tagPath) - 1;
if (remaining > 0)
{
_tagSubscriberCount[tagPath] = remaining;
continue;
}
_tagSubscriberCount.Remove(tagPath);
if (!otherSubscribers && _subscriptionIds.TryGetValue(tagPath, out var subId))
// Last subscriber gone. A tag with a subscription id is a resolved tag;
// an unresolved tag never has a subscription id, so reaching this branch
// via TryGetValue means the tag was resolved — decrement _resolvedTags
// unconditionally (the previous `!_unresolvedTags.Contains` re-check after
// an unconditional Remove was always-true dead logic).
if (_subscriptionIds.TryGetValue(tagPath, out var subId))
{
_ = _adapter.UnsubscribeAsync(subId);
_subscriptionIds.Remove(tagPath);
_unresolvedTags.Remove(tagPath);
_resolutionInFlight.Remove(tagPath);
_totalSubscribed--;
if (!_unresolvedTags.Contains(tagPath))
_resolvedTags--;
_resolvedTags--;
// DataConnectionLayer-006: drop the tag's tracked quality so it is no
// longer counted by PushBadQualityForAllTags (which sets _tagsBadQuality
@@ -716,6 +737,16 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
}
else if (_unresolvedTags.Remove(tagPath))
{
// Last subscriber gone for a tag that had never resolved: stop
// retrying it and drop it from the subscribed total. The previous
// implementation never reached this case (its guard required a
// subscription id), so an unresolved tag leaked into the retry timer
// and TotalSubscribedTags forever after its instance unsubscribed.
_resolutionInFlight.Remove(tagPath);
_totalSubscribed--;
}
}
_subscriptionsByInstance.Remove(request.InstanceUniqueName);

View File

@@ -38,7 +38,12 @@ public class OpcUaDataConnection : IDataConnection
_logger = logger;
}
private volatile bool _disconnectFired;
// DataConnectionLayer-013: an int flag toggled with Interlocked.Exchange so the
// "only the first caller fires Disconnected" guard in RaiseDisconnected is genuinely
// atomic. A plain volatile bool gives visibility but not atomicity — two threads
// (e.g. the keep-alive thread and a ReadAsync failure path) could both observe it
// false and both raise the event. 0 = not fired, 1 = fired.
private int _disconnectFired;
public ConnectionHealth Status => _status;
public event Action? Disconnected;
@@ -82,7 +87,7 @@ public class OpcUaDataConnection : IDataConnection
await _client.ConnectAsync(_endpointUrl, options, cancellationToken);
_status = ConnectionHealth.Connected;
_disconnectFired = false;
Interlocked.Exchange(ref _disconnectFired, 0);
_logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
await StartHeartbeatMonitorAsync(config.Heartbeat, cancellationToken);
@@ -285,12 +290,15 @@ public class OpcUaDataConnection : IDataConnection
/// <summary>
/// Marks the connection as disconnected and fires the Disconnected event once.
/// Thread-safe: only the first caller triggers the event.
/// Thread-safe: the firing guard is an atomic compare-and-set
/// (<see cref="Interlocked.Exchange(ref int, int)"/>), so when several threads race
/// here — e.g. the keep-alive thread via <see cref="OnClientConnectionLost"/> and a
/// <c>ReadAsync</c> failure path — exactly one of them observes the 0→1 transition
/// and invokes <see cref="Disconnected"/>.
/// </summary>
private void RaiseDisconnected()
{
if (_disconnectFired) return;
_disconnectFired = true;
if (Interlocked.Exchange(ref _disconnectFired, 1) != 0) return;
_status = ConnectionHealth.Disconnected;
_logger.LogWarning("OPC UA connection to {Endpoint} lost", _endpointUrl);
Disconnected?.Invoke();

View File

@@ -24,7 +24,10 @@ public class RealOpcUaClient : IOpcUaClient
// Clear() is undefined behaviour, so they must be ConcurrentDictionary.
private readonly ConcurrentDictionary<string, MonitoredItem> _monitoredItems = new();
private readonly ConcurrentDictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
private volatile bool _connectionLostFired;
// DataConnectionLayer-013: int flag toggled with Interlocked.Exchange so the
// once-only ConnectionLost guard in OnSessionKeepAlive is atomic, not just visible.
// 0 = not fired, 1 = fired.
private int _connectionLostFired;
private OpcUaConnectionOptions _options = new();
private readonly OpcUaGlobalOptions _globalOptions;
private readonly ILogger<RealOpcUaClient> _logger;
@@ -112,7 +115,7 @@ public class RealOpcUaClient : IOpcUaClient
"ScadaLink-DCL-Session", (uint)opts.SessionTimeoutMs, userIdentity, null, cancellationToken);
// Detect server going offline via keep-alive failures
_connectionLostFired = false;
Interlocked.Exchange(ref _connectionLostFired, 0);
_session.KeepAlive += OnSessionKeepAlive;
// Store options for monitored item creation
@@ -243,14 +246,15 @@ public class RealOpcUaClient : IOpcUaClient
/// <summary>
/// Called by the OPC UA SDK when a keep-alive response arrives (or fails).
/// When CurrentState is bad, the server is unreachable.
/// When CurrentState is bad, the server is unreachable. The once-only guard is an
/// atomic compare-and-set, so a burst of failed keep-alives raises
/// <see cref="ConnectionLost"/> exactly once.
/// </summary>
private void OnSessionKeepAlive(ISession session, KeepAliveEventArgs e)
{
if (ServiceResult.IsBad(e.Status))
{
if (_connectionLostFired) return;
_connectionLostFired = true;
if (Interlocked.Exchange(ref _connectionLostFired, 1) != 0) return;
ConnectionLost?.Invoke();
}
}

View File

@@ -836,4 +836,63 @@ public class DataConnectionActorTests : TestKit
Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked
Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved
}
// ── DataConnectionLayer-008: HandleUnsubscribe shared-tag reference counting ──
[Fact]
public async Task DCL008_Unsubscribe_OnlyReleasesTagWhenLastSubscriberLeaves()
{
// Regression test for DataConnectionLayer-008. HandleUnsubscribe must release a
// tag at the adapter only when no other instance still subscribes to it. The
// O(n) per-tag scan over every instance was replaced with an O(1) reference
// count; this guards that the reference count tracks shared subscriptions
// correctly — a shared tag is kept while any subscriber remains and the
// resolved-tag counter and adapter UnsubscribeAsync stay consistent.
var unsubscribed = new System.Collections.Concurrent.ConcurrentBag<string>();
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(ci => Task.FromResult("sub-" + (string)ci[0]));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
_mockAdapter.UnsubscribeAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(ci => { unsubscribed.Add((string)ci[0]); return Task.CompletedTask; });
var actor = CreateConnectionActor("dcl008-shared");
await Task.Delay(300);
// Two instances both subscribe to the shared tag; instA also has an exclusive tag.
actor.Tell(new SubscribeTagsRequest("c1", "instA", "dcl008-shared",
["shared/tag", "exclusive/a"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
actor.Tell(new SubscribeTagsRequest("c2", "instB", "dcl008-shared",
["shared/tag"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
// Unsubscribe instA — shared/tag must stay (instB still subscribes); only
// exclusive/a is released at the adapter.
actor.Tell(new UnsubscribeTagsRequest("c3", "instA", "dcl008-shared", DateTimeOffset.UtcNow));
await Task.Delay(300);
Assert.Contains("sub-exclusive/a", unsubscribed);
Assert.DoesNotContain("sub-shared/tag", unsubscribed);
// Health: 1 tag still subscribed and resolved (shared/tag held by instB).
actor.Tell(new DataConnectionActor.GetHealthReport());
var report1 = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(1, report1.TotalSubscribedTags);
Assert.Equal(1, report1.ResolvedTags);
// Unsubscribe instB — now shared/tag has no subscribers and is released.
actor.Tell(new UnsubscribeTagsRequest("c4", "instB", "dcl008-shared", DateTimeOffset.UtcNow));
await Task.Delay(300);
Assert.Contains("sub-shared/tag", unsubscribed);
actor.Tell(new DataConnectionActor.GetHealthReport());
var report2 = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(0, report2.TotalSubscribedTags);
Assert.Equal(0, report2.ResolvedTags);
}
}

View File

@@ -116,6 +116,45 @@ public class OpcUaDataConnectionTests
Assert.Equal(ConnectionHealth.Disconnected, _adapter.Status);
}
[Fact]
public async Task DCL013_ConcurrentConnectionLost_RaisesDisconnectedExactlyOnce()
{
// Regression test for DataConnectionLayer-013. RaiseDisconnected used a
// non-atomic check-then-set on a volatile bool: two threads racing through it
// (e.g. the keep-alive thread and a ReadAsync failure path, both routed via
// OnClientConnectionLost) could both observe _disconnectFired == false and both
// invoke Disconnected. The guard is now an atomic Interlocked.Exchange, so a
// burst of concurrent connection-lost callbacks fires the event exactly once.
// Repeat the burst: reconnecting between rounds re-arms the guard, so each
// round must independently fire Disconnected exactly once. Repetition makes
// the (timing-dependent) non-atomic race overwhelmingly likely to be caught.
const int rounds = 25;
const int threads = 32;
for (var round = 0; round < rounds; round++)
{
_mockClient.IsConnected.Returns(true);
await _adapter.ConnectAsync(new Dictionary<string, string>());
var fired = 0;
void Handler() => Interlocked.Increment(ref fired);
_adapter.Disconnected += Handler;
// Fan out: many threads raise the client's ConnectionLost event together.
using (var ready = new Barrier(threads))
{
var tasks = Enumerable.Range(0, threads).Select(_ => Task.Run(() =>
{
ready.SignalAndWait();
_mockClient.ConnectionLost += Raise.Event<Action>();
})).ToArray();
await Task.WhenAll(tasks);
}
_adapter.Disconnected -= Handler;
Assert.Equal(1, fired);
}
}
[Fact]
public async Task Subscribe_DelegatesAndReturnsId()
{