diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index 73c3a551..78b06030 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -82,6 +82,15 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit /// Per-connection gate. PRs 67+ serialize read/write/browse on this. internal SemaphoreSlim Gate => _gate; + /// + /// Test-only seam to swap the active the way + /// does at runtime, so the stale-session race the + /// read/history paths guard against (re-read Session inside _gate, not before) + /// can be exercised deterministically without standing up a live reconnect handler. + /// Production code never calls this — the SDK reconnect handler owns the real swap. + /// + internal void SetSessionForTest(ISession? session) => Session = session; + private DriverHealth _health = new(DriverState.Unknown, null, null); private bool _disposed; /// URL of the endpoint the driver actually connected to. Exposed via . @@ -1131,7 +1140,10 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit public async Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { - var session = RequireSession(); + // Fast-fail before queuing if the driver isn't connected, but do NOT bind the + // subscription to this reference — a reconnect can swap Session while we wait on _gate. + // The session actually used is re-read inside the gate (Driver.OpcUaClient-001/-006). + _ = RequireSession(); var id = Interlocked.Increment(ref _nextSubscriptionId); var handle = new OpcUaSubscriptionHandle(id); @@ -1157,6 +1169,11 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { + // Re-read Session inside the critical section: a reconnect completing while we were + // blocked on _gate has already swapped in the new session via OnReconnectComplete. + var session = Session + ?? throw new InvalidOperationException( + "OpcUaClient session was lost before the subscription could be created."); session.AddSubscription(subscription); await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); @@ -1296,7 +1313,10 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit public async Task SubscribeAlarmsAsync( IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) { - var session = RequireSession(); + // Fast-fail before queuing if the driver isn't connected, but do NOT bind the alarm + // subscription to this reference — a reconnect can swap Session while we wait on _gate. + // The session actually used is re-read inside the gate (Driver.OpcUaClient-001/-006). + _ = RequireSession(); var id = Interlocked.Increment(ref _nextAlarmSubscriptionId); var handle = new OpcUaAlarmSubscriptionHandle(id); @@ -1345,6 +1365,11 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { + // Re-read Session inside the critical section: a reconnect completing while we were + // blocked on _gate has already swapped in the new session via OnReconnectComplete. + var session = Session + ?? throw new InvalidOperationException( + "OpcUaClient session was lost before the alarm subscription could be created."); session.AddSubscription(subscription); await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); @@ -1410,31 +1435,42 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit // list without guarding the size themselves — e.g. a bulk-ack UI that built an empty // list because the filter matched nothing. if (acknowledgements.Count == 0) return; - var session = RequireSession(); - - // OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each - // condition node with EventId + Comment arguments. CallAsync accepts a batch — - // one CallMethodRequest per ack. - var callRequests = new CallMethodRequestCollection(); - foreach (var ack in acknowledgements) - { - if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue; - callRequests.Add(new CallMethodRequest - { - ObjectId = conditionId, - MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge, - InputArguments = [ - new Variant(Array.Empty()), // EventId — server-side best-effort; empty resolves to 'most recent' - new Variant(new LocalizedText(ack.Comment ?? string.Empty)), - ], - }); - } - - if (callRequests.Count == 0) return; + // Fast-fail before queuing if the driver isn't connected, but do NOT bind the ack calls + // (or the namespace-relative ConditionId parse) to this reference — a reconnect can swap + // Session while we wait on _gate. The session actually used is re-read inside the gate + // (Driver.OpcUaClient-001/-006). + _ = RequireSession(); await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { + // Re-read Session inside the critical section: a reconnect completing while we were + // blocked on _gate has already swapped in the new session via OnReconnectComplete. + // ConditionId parsing is namespace-relative, so it must use the current session's + // namespace table too. + var session = Session; + if (session is null) return; + + // OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each + // condition node with EventId + Comment arguments. CallAsync accepts a batch — + // one CallMethodRequest per ack. + var callRequests = new CallMethodRequestCollection(); + foreach (var ack in acknowledgements) + { + if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue; + callRequests.Add(new CallMethodRequest + { + ObjectId = conditionId, + MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge, + InputArguments = [ + new Variant(Array.Empty()), // EventId — server-side best-effort; empty resolves to 'most recent' + new Variant(new LocalizedText(ack.Comment ?? string.Empty)), + ], + }); + } + + if (callRequests.Count == 0) return; + try { var resp = await session.CallAsync( @@ -1615,20 +1651,30 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit private async Task ExecuteHistoryReadAsync( string fullReference, ExtensionObject historyReadDetails, CancellationToken ct) { - var session = RequireSession(); - if (!TryParseNodeId(session, fullReference, out var nodeId)) - { - return new Core.Abstractions.HistoryReadResult([], null); - } - - var nodesToRead = new HistoryReadValueIdCollection - { - new HistoryReadValueId { NodeId = nodeId }, - }; + // Make sure a session exists before queuing on the gate, but do NOT bind the wire call + // (or the namespace-relative NodeId parse) to this reference — a reconnect can swap + // Session while we wait on _gate. The session actually used is re-read inside the gate + // (Driver.OpcUaClient-001/-006). + _ = RequireSession(); await _gate.WaitAsync(ct).ConfigureAwait(false); try { + // Re-read Session inside the critical section: if a reconnect completed while we + // were blocked on _gate, OnReconnectComplete has already swapped in the new session. + // NodeId parsing is namespace-relative, so it must also use the current session's + // namespace table. + var session = Session; + if (session is null || !TryParseNodeId(session, fullReference, out var nodeId)) + { + return new Core.Abstractions.HistoryReadResult([], null); + } + + var nodesToRead = new HistoryReadValueIdCollection + { + new HistoryReadValueId { NodeId = nodeId }, + }; + var resp = await session.HistoryReadAsync( requestHeader: null, historyReadDetails: historyReadDetails, @@ -1785,38 +1831,47 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken cancellationToken) { - var session = RequireSession(); - - NodeId notifierNodeId; - if (string.IsNullOrEmpty(sourceName)) - { - notifierNodeId = ObjectIds.Server; - } - else if (!TryParseNodeId(session, sourceName, out var parsed)) - { - return new Core.Abstractions.HistoricalEventsResult([], null); - } - else - { - notifierNodeId = parsed; - } - - var details = new ReadEventDetails - { - StartTime = startUtc, - EndTime = endUtc, - NumValuesPerNode = maxEvents <= 0 ? 0u : (uint)maxEvents, - Filter = BuildBaseEventFilter(), - }; - - var nodesToRead = new HistoryReadValueIdCollection - { - new HistoryReadValueId { NodeId = notifierNodeId }, - }; + // Confirm a session exists before queuing; the session actually used — and the + // namespace-relative source-NodeId parse — are re-read inside the gate so a reconnect + // mid-wait can't leave us reading from a closed session (Driver.OpcUaClient-001/-006). + _ = RequireSession(); await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { + // Re-read Session inside the critical section: OnReconnectComplete may have swapped + // in a new session while we were blocked on _gate, and the source-NodeId parse is + // namespace-relative so it must bind against the current session's namespace table. + var session = Session; + if (session is null) return new Core.Abstractions.HistoricalEventsResult([], null); + + NodeId notifierNodeId; + if (string.IsNullOrEmpty(sourceName)) + { + notifierNodeId = ObjectIds.Server; + } + else if (!TryParseNodeId(session, sourceName, out var parsed)) + { + return new Core.Abstractions.HistoricalEventsResult([], null); + } + else + { + notifierNodeId = parsed; + } + + var details = new ReadEventDetails + { + StartTime = startUtc, + EndTime = endUtc, + NumValuesPerNode = maxEvents <= 0 ? 0u : (uint)maxEvents, + Filter = BuildBaseEventFilter(), + }; + + var nodesToRead = new HistoryReadValueIdCollection + { + new HistoryReadValueId { NodeId = notifierNodeId }, + }; + var resp = await session.HistoryReadAsync( requestHeader: null, historyReadDetails: new ExtensionObject(details), diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientStaleSessionRaceTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientStaleSessionRaceTests.cs new file mode 100644 index 00000000..64f973e6 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientStaleSessionRaceTests.cs @@ -0,0 +1,161 @@ +using Moq; +using Opc.Ua; +using Opc.Ua.Client; +using Shouldly; +using Xunit; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Regression coverage for the stale-session concurrency race in the history/read wire +/// paths. A background reconnect handler can swap +/// while a caller is parked on _gate.WaitAsync. If a method captures the session +/// reference before the gate and uses it afterwards, it issues the wire call against +/// the closed/stale session. The correct idiom — already used by ReadAsync/WriteAsync — is +/// a fast-fail guard outside the gate plus an authoritative re-read of Session +/// inside the gate. These tests prove the history/read funnels follow it. +/// +/// +/// The swap is staged so the race is exercised, not raced against: +/// +/// The test acquires _gate first, so the method-under-test physically +/// cannot enter the critical section (read Session-in-gate / issue the wire call) until +/// the test releases the gate. Whatever the implementation does after the gate +/// therefore always sees the post-swap session. +/// The only thing that must be ordered is the buggy capture-before-gate: +/// its pre-gate NodeId parse reads the OLD session's MessageContext, which +/// completes a . We wait on that signal so the swap +/// lands after the buggy capture has grabbed the OLD session — making the RED +/// outcome deterministic, not timing-dependent. +/// The fixed implementation reads no session member before the gate, so the +/// signal never fires; a bounded fallback then swaps+releases. That is correct for the +/// fixed path because its session read happens only after we release the gate, so the +/// swap is already visible regardless of ordering. +/// +/// A correct implementation re-reads Session inside the gate and lands the wire call +/// on the NEW session; the buggy capture-before-gate code lands it on the OLD one. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientStaleSessionRaceTests +{ + private static Mock NewSessionMock(TaskCompletionSource? prologueReached = null) + { + var resp = new HistoryReadResponse + { + ResponseHeader = new ResponseHeader(), + Results = new HistoryReadResultCollection(), + DiagnosticInfos = new DiagnosticInfoCollection(), + }; + + var mock = new Mock(MockBehavior.Loose); + // NodeId parsing inside the driver resolves the reference against the session's + // MessageContext. This getter is read in the driver's synchronous prologue (before the + // gate), so signalling here proves the pre-gate capture path has run. + mock.SetupGet(s => s.MessageContext) + .Returns(new ServiceMessageContext()) + .Callback(() => prologueReached?.TrySetResult()); + mock.SetupGet(s => s.NamespaceUris).Returns(new NamespaceTable()); + mock.Setup(s => s.HistoryReadAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(resp); + return mock; + } + + private static void VerifyWireHitNewSessionNotOld(Mock newSession, Mock oldSession) + { + newSession.Verify(s => s.HistoryReadAsync( + It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny()), + Times.Once); + oldSession.Verify(s => s.HistoryReadAsync( + It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny()), + Times.Never); + } + + /// + /// ExecuteHistoryReadAsync (the ReadRaw/ReadProcessed/ReadAtTime funnel) must issue the + /// HistoryRead against the session current after the gate, not a reference + /// captured before WaitAsync. + /// + [Fact] + public async Task ReadRawAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one() + { + var ct = TestContext.Current.CancellationToken; + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-raw"); + + var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var oldSession = NewSessionMock(prologueReached); + var newSession = NewSessionMock(); + drv.SetSessionForTest(oldSession.Object); + + // Hold the gate so the method parks on WaitAsync after running its synchronous prologue. + await drv.Gate.WaitAsync(ct); + + var call = drv.ReadRawAsync( + "ns=2;s=Counter", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000, ct); + + // Wait until the buggy capture-before-gate has grabbed the OLD session (signalled via + // its MessageContext read), then stage the reconnect swap and release the gate. The + // fixed path reads no pre-gate session member, so the signal never fires and the + // bounded fallback elapses — harmless, because the fixed read happens only after the + // gate is released and therefore always sees the swapped session. + await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct); + drv.SetSessionForTest(newSession.Object); + drv.Gate.Release(); + + await call; + + VerifyWireHitNewSessionNotOld(newSession, oldSession); + } + + /// + /// ReadEventsAsync is a separate HistoryRead site with the same hazard — it must also + /// re-read Session inside the gate rather than use the pre-gate capture. + /// + [Fact] + public async Task ReadEventsAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one() + { + var ct = TestContext.Current.CancellationToken; + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-events"); + + var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + // ReadEventsAsync only parses the source NodeId when sourceName is non-empty, so pass a + // concrete source id to force the prologue's MessageContext read (and its signal). + var oldSession = NewSessionMock(prologueReached); + var newSession = NewSessionMock(); + drv.SetSessionForTest(oldSession.Object); + + await drv.Gate.WaitAsync(ct); + + var call = drv.ReadEventsAsync( + sourceName: "ns=2;s=Pump17", startUtc: DateTime.UtcNow.AddMinutes(-5), + endUtc: DateTime.UtcNow, maxEvents: 100, cancellationToken: ct); + + await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct); + drv.SetSessionForTest(newSession.Object); + drv.Gate.Release(); + + await call; + + VerifyWireHitNewSessionNotOld(newSession, oldSession); + } + + /// + /// Awaits the buggy path's pre-gate-capture signal, or a bounded fallback when it never + /// fires (the fixed path performs no pre-gate session member read). Generous so the + /// buggy-path signal effectively always wins first, keeping the RED outcome + /// deterministic; the fallback only matters for the GREEN (fixed) path where swap + /// ordering is irrelevant. + /// + private static async Task WaitForPreGateCaptureOrFallbackAsync(Task signal, CancellationToken ct) + { + var fallback = Task.Delay(TimeSpan.FromSeconds(2), ct); + await Task.WhenAny(signal, fallback); + } +} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests.csproj b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests.csproj index 99097026..5f8839d5 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests.csproj +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests.csproj @@ -12,6 +12,7 @@ + all