eede0c926b
Closes the code-review coverage gap: AcknowledgeAsync now has its own swap-across-the-gate regression test (CallAsync lands on the post-gate session), the subscribe-path coverage gap is documented, and the bounded fallback drops from 2s to 250ms (the buggy-path signal is a synchronous pre-await read, so it always wins well inside the bound).
222 lines
11 KiB
C#
222 lines
11 KiB
C#
using Moq;
|
|
using Opc.Ua;
|
|
using Opc.Ua.Client;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
|
|
|
/// <summary>
|
|
/// Regression coverage for the stale-session concurrency race in the history/read wire
|
|
/// paths. A background reconnect handler can swap <see cref="OpcUaClientDriver.Session"/>
|
|
/// while a caller is parked on <c>_gate.WaitAsync</c>. If a method captures the session
|
|
/// reference <i>before</i> the gate and uses it afterwards, it issues the wire call against
|
|
/// the closed/stale session. The correct idiom — already used by ReadAsync/WriteAsync — is
|
|
/// a fast-fail guard outside the gate plus an authoritative re-read of <c>Session</c>
|
|
/// <i>inside</i> the gate. These tests prove the history/read funnels follow it.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The swap is staged so the race is exercised, not raced against:
|
|
/// <list type="number">
|
|
/// <item>The test acquires <c>_gate</c> first, so the method-under-test physically
|
|
/// cannot enter the critical section (read Session-in-gate / issue the wire call) until
|
|
/// the test releases the gate. Whatever the implementation does <i>after</i> the gate
|
|
/// therefore always sees the post-swap session.</item>
|
|
/// <item>The only thing that must be ordered is the <b>buggy</b> capture-before-gate:
|
|
/// its pre-gate NodeId parse reads the OLD session's <c>MessageContext</c>, which
|
|
/// completes a <see cref="TaskCompletionSource"/>. We wait on that signal so the swap
|
|
/// lands <i>after</i> the buggy capture has grabbed the OLD session — making the RED
|
|
/// outcome deterministic, not timing-dependent.</item>
|
|
/// <item>The fixed implementation reads no session member before the gate, so the
|
|
/// signal never fires; a bounded fallback then swaps+releases. That is correct for the
|
|
/// fixed path because its session read happens only after we release the gate, so the
|
|
/// swap is already visible regardless of ordering.</item>
|
|
/// </list>
|
|
/// A correct implementation re-reads <c>Session</c> inside the gate and lands the wire call
|
|
/// on the NEW session; the buggy capture-before-gate code lands it on the OLD one.
|
|
/// </remarks>
|
|
[Trait("Category", "Unit")]
|
|
public sealed class OpcUaClientStaleSessionRaceTests
|
|
{
|
|
private static Mock<ISession> NewSessionMock(TaskCompletionSource? prologueReached = null)
|
|
{
|
|
var resp = new HistoryReadResponse
|
|
{
|
|
ResponseHeader = new ResponseHeader(),
|
|
Results = new HistoryReadResultCollection(),
|
|
DiagnosticInfos = new DiagnosticInfoCollection(),
|
|
};
|
|
|
|
var mock = new Mock<ISession>(MockBehavior.Loose);
|
|
// NodeId parsing inside the driver resolves the reference against the session's
|
|
// MessageContext. This getter is read in the driver's synchronous prologue (before the
|
|
// gate), so signalling here proves the pre-gate capture path has run.
|
|
mock.SetupGet(s => s.MessageContext)
|
|
.Returns(new ServiceMessageContext())
|
|
.Callback(() => prologueReached?.TrySetResult());
|
|
mock.SetupGet(s => s.NamespaceUris).Returns(new NamespaceTable());
|
|
mock.Setup(s => s.HistoryReadAsync(
|
|
It.IsAny<RequestHeader>(),
|
|
It.IsAny<ExtensionObject>(),
|
|
It.IsAny<TimestampsToReturn>(),
|
|
It.IsAny<bool>(),
|
|
It.IsAny<HistoryReadValueIdCollection>(),
|
|
It.IsAny<CancellationToken>()))
|
|
.ReturnsAsync(resp);
|
|
// Acknowledge issues its wire call via CallAsync; stub it so AcknowledgeAsync can run to
|
|
// completion against whichever session it ends up binding to.
|
|
mock.Setup(s => s.CallAsync(
|
|
It.IsAny<RequestHeader>(),
|
|
It.IsAny<CallMethodRequestCollection>(),
|
|
It.IsAny<CancellationToken>()))
|
|
.ReturnsAsync(new CallResponse
|
|
{
|
|
ResponseHeader = new ResponseHeader(),
|
|
Results = new CallMethodResultCollection(),
|
|
DiagnosticInfos = new DiagnosticInfoCollection(),
|
|
});
|
|
return mock;
|
|
}
|
|
|
|
private static void VerifyWireHitNewSessionNotOld(Mock<ISession> newSession, Mock<ISession> oldSession)
|
|
{
|
|
newSession.Verify(s => s.HistoryReadAsync(
|
|
It.IsAny<RequestHeader>(), It.IsAny<ExtensionObject>(), It.IsAny<TimestampsToReturn>(),
|
|
It.IsAny<bool>(), It.IsAny<HistoryReadValueIdCollection>(), It.IsAny<CancellationToken>()),
|
|
Times.Once);
|
|
oldSession.Verify(s => s.HistoryReadAsync(
|
|
It.IsAny<RequestHeader>(), It.IsAny<ExtensionObject>(), It.IsAny<TimestampsToReturn>(),
|
|
It.IsAny<bool>(), It.IsAny<HistoryReadValueIdCollection>(), It.IsAny<CancellationToken>()),
|
|
Times.Never);
|
|
}
|
|
|
|
private static void VerifyAckWireHitNewSessionNotOld(Mock<ISession> newSession, Mock<ISession> oldSession)
|
|
{
|
|
newSession.Verify(s => s.CallAsync(
|
|
It.IsAny<RequestHeader>(), It.IsAny<CallMethodRequestCollection>(), It.IsAny<CancellationToken>()),
|
|
Times.Once);
|
|
oldSession.Verify(s => s.CallAsync(
|
|
It.IsAny<RequestHeader>(), It.IsAny<CallMethodRequestCollection>(), It.IsAny<CancellationToken>()),
|
|
Times.Never);
|
|
}
|
|
|
|
/// <summary>
|
|
/// ExecuteHistoryReadAsync (the ReadRaw/ReadProcessed/ReadAtTime funnel) must issue the
|
|
/// HistoryRead against the session current <i>after</i> the gate, not a reference
|
|
/// captured before WaitAsync.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ReadRawAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-raw");
|
|
|
|
var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var oldSession = NewSessionMock(prologueReached);
|
|
var newSession = NewSessionMock();
|
|
drv.SetSessionForTest(oldSession.Object);
|
|
|
|
// Hold the gate so the method parks on WaitAsync after running its synchronous prologue.
|
|
await drv.Gate.WaitAsync(ct);
|
|
|
|
var call = drv.ReadRawAsync(
|
|
"ns=2;s=Counter", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000, ct);
|
|
|
|
// Wait until the buggy capture-before-gate has grabbed the OLD session (signalled via
|
|
// its MessageContext read), then stage the reconnect swap and release the gate. The
|
|
// fixed path reads no pre-gate session member, so the signal never fires and the
|
|
// bounded fallback elapses — harmless, because the fixed read happens only after the
|
|
// gate is released and therefore always sees the swapped session.
|
|
await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct);
|
|
drv.SetSessionForTest(newSession.Object);
|
|
drv.Gate.Release();
|
|
|
|
await call;
|
|
|
|
VerifyWireHitNewSessionNotOld(newSession, oldSession);
|
|
}
|
|
|
|
/// <summary>
|
|
/// ReadEventsAsync is a separate HistoryRead site with the same hazard — it must also
|
|
/// re-read Session inside the gate rather than use the pre-gate capture.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ReadEventsAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-events");
|
|
|
|
var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
// ReadEventsAsync only parses the source NodeId when sourceName is non-empty, so pass a
|
|
// concrete source id to force the prologue's MessageContext read (and its signal).
|
|
var oldSession = NewSessionMock(prologueReached);
|
|
var newSession = NewSessionMock();
|
|
drv.SetSessionForTest(oldSession.Object);
|
|
|
|
await drv.Gate.WaitAsync(ct);
|
|
|
|
var call = drv.ReadEventsAsync(
|
|
sourceName: "ns=2;s=Pump17", startUtc: DateTime.UtcNow.AddMinutes(-5),
|
|
endUtc: DateTime.UtcNow, maxEvents: 100, cancellationToken: ct);
|
|
|
|
await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct);
|
|
drv.SetSessionForTest(newSession.Object);
|
|
drv.Gate.Release();
|
|
|
|
await call;
|
|
|
|
VerifyWireHitNewSessionNotOld(newSession, oldSession);
|
|
}
|
|
|
|
/// <summary>
|
|
/// AcknowledgeAsync issues its Acknowledge method <c>CallAsync</c> against the session
|
|
/// current after the gate — and parses the namespace-relative ConditionId against that
|
|
/// same session — rather than a reference captured before WaitAsync. This is the third
|
|
/// gate-protected wire site; the two subscribe paths (<c>SubscribeAsync</c>,
|
|
/// <c>SubscribeAlarmsAsync</c>) received the identical fix but are not covered here
|
|
/// because asserting them requires a live <c>Subscription</c> object (the SDK's
|
|
/// <c>AddSubscription</c>/<c>CreateAsync</c> can't be exercised through this mock seam);
|
|
/// their correctness is reviewable by inspection against this same idiom.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task AcknowledgeAsync_uses_session_swapped_in_across_the_gate_not_the_captured_one()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-stale-ack");
|
|
|
|
var prologueReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var oldSession = NewSessionMock(prologueReached);
|
|
var newSession = NewSessionMock();
|
|
drv.SetSessionForTest(oldSession.Object);
|
|
|
|
await drv.Gate.WaitAsync(ct);
|
|
|
|
var call = drv.AcknowledgeAsync(
|
|
[new AlarmAcknowledgeRequest("ns=2;s=Pump17", "ns=2;s=Pump17", "operator ack")], ct);
|
|
|
|
await WaitForPreGateCaptureOrFallbackAsync(prologueReached.Task, ct);
|
|
drv.SetSessionForTest(newSession.Object);
|
|
drv.Gate.Release();
|
|
|
|
await call;
|
|
|
|
VerifyAckWireHitNewSessionNotOld(newSession, oldSession);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Awaits the buggy path's pre-gate-capture signal, or a bounded fallback when it never
|
|
/// fires (the fixed path performs no pre-gate session member read). The buggy-path signal
|
|
/// fires from a synchronous <c>MessageContext</c> read that runs before any await, so it
|
|
/// always wins the race well inside the fallback — keeping the RED outcome deterministic.
|
|
/// The fallback only matters for the GREEN (fixed) path where swap ordering is irrelevant
|
|
/// (the fixed read happens only after the gate is released), so a short bound keeps the
|
|
/// suite fast without weakening the guarantee.
|
|
/// </summary>
|
|
private static async Task WaitForPreGateCaptureOrFallbackAsync(Task signal, CancellationToken ct)
|
|
{
|
|
var fallback = Task.Delay(TimeSpan.FromMilliseconds(250), ct);
|
|
await Task.WhenAny(signal, fallback);
|
|
}
|
|
}
|