using Moq; using Opc.Ua; using Opc.Ua.Client; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; /// /// Regression coverage for the stale-session concurrency race in the history/read wire /// paths. A background reconnect handler can swap /// while a caller is parked on _gate.WaitAsync. If a method captures the session /// reference before the gate and uses it afterwards, it issues the wire call against /// the closed/stale session. The correct idiom — already used by ReadAsync/WriteAsync — is /// a fast-fail guard outside the gate plus an authoritative re-read of Session /// inside the gate. These tests prove the history/read funnels follow it. /// /// /// The swap is staged so the race is exercised, not raced against: /// /// The test acquires _gate first, so the method-under-test physically /// cannot enter the critical section (read Session-in-gate / issue the wire call) until /// the test releases the gate. Whatever the implementation does after the gate /// therefore always sees the post-swap session. /// The only thing that must be ordered is the buggy capture-before-gate: /// its pre-gate NodeId parse reads the OLD session's MessageContext, which /// completes a . We wait on that signal so the swap /// lands after the buggy capture has grabbed the OLD session — making the RED /// outcome deterministic, not timing-dependent. /// The fixed implementation reads no session member before the gate, so the /// signal never fires; a bounded fallback then swaps+releases. That is correct for the /// fixed path because its session read happens only after we release the gate, so the /// swap is already visible regardless of ordering. /// /// A correct implementation re-reads Session inside the gate and lands the wire call /// on the NEW session; the buggy capture-before-gate code lands it on the OLD one. /// [Trait("Category", "Unit")] public sealed class OpcUaClientStaleSessionRaceTests { private static Mock NewSessionMock(TaskCompletionSource? prologueReached = null) { var resp = new HistoryReadResponse { ResponseHeader = new ResponseHeader(), Results = new HistoryReadResultCollection(), DiagnosticInfos = new DiagnosticInfoCollection(), }; var mock = new Mock(MockBehavior.Loose); // NodeId parsing inside the driver resolves the reference against the session's // MessageContext. This getter is read in the driver's synchronous prologue (before the // gate), so signalling here proves the pre-gate capture path has run. mock.SetupGet(s => s.MessageContext) .Returns(new ServiceMessageContext()) .Callback(() => prologueReached?.TrySetResult()); mock.SetupGet(s => s.NamespaceUris).Returns(new NamespaceTable()); mock.Setup(s => s.HistoryReadAsync( It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(resp); // Acknowledge issues its wire call via CallAsync; stub it so AcknowledgeAsync can run to // completion against whichever session it ends up binding to. mock.Setup(s => s.CallAsync( It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(new CallResponse { ResponseHeader = new ResponseHeader(), Results = new CallMethodResultCollection(), DiagnosticInfos = new DiagnosticInfoCollection(), }); return mock; } private static void VerifyWireHitNewSessionNotOld(Mock newSession, Mock oldSession) { newSession.Verify(s => s.HistoryReadAsync( It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); oldSession.Verify(s => s.HistoryReadAsync( It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); } private static void VerifyAckWireHitNewSessionNotOld(Mock newSession, Mock oldSession) { newSession.Verify(s => s.CallAsync( It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); oldSession.Verify(s => s.CallAsync( It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); } /// /// ExecuteHistoryReadAsync (the ReadRaw/ReadProcessed/ReadAtTime funnel) must issue the /// HistoryRead against the session current after the gate, not a reference /// captured before WaitAsync. /// [Fact] public async Task ReadRawAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one() { var ct = TestContext.Current.CancellationToken; using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-raw"); var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var oldSession = NewSessionMock(prologueReached); var newSession = NewSessionMock(); drv.SetSessionForTest(oldSession.Object); // Hold the gate so the method parks on WaitAsync after running its synchronous prologue. await drv.Gate.WaitAsync(ct); var call = drv.ReadRawAsync( "ns=2;s=Counter", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000, ct); // Wait until the buggy capture-before-gate has grabbed the OLD session (signalled via // its MessageContext read), then stage the reconnect swap and release the gate. The // fixed path reads no pre-gate session member, so the signal never fires and the // bounded fallback elapses — harmless, because the fixed read happens only after the // gate is released and therefore always sees the swapped session. await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct); drv.SetSessionForTest(newSession.Object); drv.Gate.Release(); await call; VerifyWireHitNewSessionNotOld(newSession, oldSession); } /// /// ReadEventsAsync is a separate HistoryRead site with the same hazard — it must also /// re-read Session inside the gate rather than use the pre-gate capture. /// [Fact] public async Task ReadEventsAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one() { var ct = TestContext.Current.CancellationToken; using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-events"); var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // ReadEventsAsync only parses the source NodeId when sourceName is non-empty, so pass a // concrete source id to force the prologue's MessageContext read (and its signal). var oldSession = NewSessionMock(prologueReached); var newSession = NewSessionMock(); drv.SetSessionForTest(oldSession.Object); await drv.Gate.WaitAsync(ct); var call = drv.ReadEventsAsync( sourceName: "ns=2;s=Pump17", startUtc: DateTime.UtcNow.AddMinutes(-5), endUtc: DateTime.UtcNow, maxEvents: 100, cancellationToken: ct); await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct); drv.SetSessionForTest(newSession.Object); drv.Gate.Release(); await call; VerifyWireHitNewSessionNotOld(newSession, oldSession); } /// /// AcknowledgeAsync issues its Acknowledge method CallAsync against the session /// current after the gate — and parses the namespace-relative ConditionId against that /// same session — rather than a reference captured before WaitAsync. This is the third /// gate-protected wire site; the two subscribe paths (SubscribeAsync, /// SubscribeAlarmsAsync) received the identical fix but are not covered here /// because asserting them requires a live Subscription object (the SDK's /// AddSubscription/CreateAsync can't be exercised through this mock seam); /// their correctness is reviewable by inspection against this same idiom. /// [Fact] public async Task AcknowledgeAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one() { var ct = TestContext.Current.CancellationToken; using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-ack"); var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var oldSession = NewSessionMock(prologueReached); var newSession = NewSessionMock(); drv.SetSessionForTest(oldSession.Object); await drv.Gate.WaitAsync(ct); var call = drv.AcknowledgeAsync( [new AlarmAcknowledgeRequest("ns=2;s=Pump17", "ns=2;s=Pump17", "operator ack")], ct); await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct); drv.SetSessionForTest(newSession.Object); drv.Gate.Release(); await call; VerifyAckWireHitNewSessionNotOld(newSession, oldSession); } /// /// Awaits the buggy path's pre-gate-capture signal, or a bounded fallback when it never /// fires (the fixed path performs no pre-gate session member read). The buggy-path signal /// fires from a synchronous MessageContext read that runs before any await, so it /// always wins the race well inside the fallback — keeping the RED outcome deterministic. /// The fallback only matters for the GREEN (fixed) path where swap ordering is irrelevant /// (the fixed read happens only after the gate is released), so a short bound keeps the /// suite fast without weakening the guarantee. /// private static async Task WaitForPreGateCaptureOrFallbackAsync(Task signal, CancellationToken ct) { var fallback = Task.Delay(TimeSpan.FromMilliseconds(250), ct); await Task.WhenAny(signal, fallback); } }