fix(opcuaclient): re-resolve session inside _gate in history/read paths (stale-session race)
This commit is contained in:
@@ -82,6 +82,15 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
/// <summary>Per-connection gate. PRs 67+ serialize read/write/browse on this.</summary>
|
||||
internal SemaphoreSlim Gate => _gate;
|
||||
|
||||
/// <summary>
|
||||
/// Test-only seam to swap the active <see cref="Session"/> the way
|
||||
/// <see cref="OnReconnectComplete"/> does at runtime, so the stale-session race the
|
||||
/// read/history paths guard against (re-read Session inside <c>_gate</c>, not before)
|
||||
/// can be exercised deterministically without standing up a live reconnect handler.
|
||||
/// Production code never calls this — the SDK reconnect handler owns the real swap.
|
||||
/// </summary>
|
||||
internal void SetSessionForTest(ISession? session) => Session = session;
|
||||
|
||||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||||
private bool _disposed;
|
||||
/// <summary>URL of the endpoint the driver actually connected to. Exposed via <see cref="HostName"/>.</summary>
|
||||
@@ -1131,7 +1140,10 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
public async Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
// Fast-fail before queuing if the driver isn't connected, but do NOT bind the
|
||||
// subscription to this reference — a reconnect can swap Session while we wait on _gate.
|
||||
// The session actually used is re-read inside the gate (Driver.OpcUaClient-001/-006).
|
||||
_ = RequireSession();
|
||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
var handle = new OpcUaSubscriptionHandle(id);
|
||||
|
||||
@@ -1157,6 +1169,11 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Re-read Session inside the critical section: a reconnect completing while we were
|
||||
// blocked on _gate has already swapped in the new session via OnReconnectComplete.
|
||||
var session = Session
|
||||
?? throw new InvalidOperationException(
|
||||
"OpcUaClient session was lost before the subscription could be created.");
|
||||
session.AddSubscription(subscription);
|
||||
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -1296,7 +1313,10 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
// Fast-fail before queuing if the driver isn't connected, but do NOT bind the alarm
|
||||
// subscription to this reference — a reconnect can swap Session while we wait on _gate.
|
||||
// The session actually used is re-read inside the gate (Driver.OpcUaClient-001/-006).
|
||||
_ = RequireSession();
|
||||
var id = Interlocked.Increment(ref _nextAlarmSubscriptionId);
|
||||
var handle = new OpcUaAlarmSubscriptionHandle(id);
|
||||
|
||||
@@ -1345,6 +1365,11 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Re-read Session inside the critical section: a reconnect completing while we were
|
||||
// blocked on _gate has already swapped in the new session via OnReconnectComplete.
|
||||
var session = Session
|
||||
?? throw new InvalidOperationException(
|
||||
"OpcUaClient session was lost before the alarm subscription could be created.");
|
||||
session.AddSubscription(subscription);
|
||||
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -1410,31 +1435,42 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
// list without guarding the size themselves — e.g. a bulk-ack UI that built an empty
|
||||
// list because the filter matched nothing.
|
||||
if (acknowledgements.Count == 0) return;
|
||||
var session = RequireSession();
|
||||
|
||||
// OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each
|
||||
// condition node with EventId + Comment arguments. CallAsync accepts a batch —
|
||||
// one CallMethodRequest per ack.
|
||||
var callRequests = new CallMethodRequestCollection();
|
||||
foreach (var ack in acknowledgements)
|
||||
{
|
||||
if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue;
|
||||
callRequests.Add(new CallMethodRequest
|
||||
{
|
||||
ObjectId = conditionId,
|
||||
MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge,
|
||||
InputArguments = [
|
||||
new Variant(Array.Empty<byte>()), // EventId — server-side best-effort; empty resolves to 'most recent'
|
||||
new Variant(new LocalizedText(ack.Comment ?? string.Empty)),
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
if (callRequests.Count == 0) return;
|
||||
// Fast-fail before queuing if the driver isn't connected, but do NOT bind the ack calls
|
||||
// (or the namespace-relative ConditionId parse) to this reference — a reconnect can swap
|
||||
// Session while we wait on _gate. The session actually used is re-read inside the gate
|
||||
// (Driver.OpcUaClient-001/-006).
|
||||
_ = RequireSession();
|
||||
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Re-read Session inside the critical section: a reconnect completing while we were
|
||||
// blocked on _gate has already swapped in the new session via OnReconnectComplete.
|
||||
// ConditionId parsing is namespace-relative, so it must use the current session's
|
||||
// namespace table too.
|
||||
var session = Session;
|
||||
if (session is null) return;
|
||||
|
||||
// OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each
|
||||
// condition node with EventId + Comment arguments. CallAsync accepts a batch —
|
||||
// one CallMethodRequest per ack.
|
||||
var callRequests = new CallMethodRequestCollection();
|
||||
foreach (var ack in acknowledgements)
|
||||
{
|
||||
if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue;
|
||||
callRequests.Add(new CallMethodRequest
|
||||
{
|
||||
ObjectId = conditionId,
|
||||
MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge,
|
||||
InputArguments = [
|
||||
new Variant(Array.Empty<byte>()), // EventId — server-side best-effort; empty resolves to 'most recent'
|
||||
new Variant(new LocalizedText(ack.Comment ?? string.Empty)),
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
if (callRequests.Count == 0) return;
|
||||
|
||||
try
|
||||
{
|
||||
var resp = await session.CallAsync(
|
||||
@@ -1615,20 +1651,30 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
private async Task<Core.Abstractions.HistoryReadResult> ExecuteHistoryReadAsync(
|
||||
string fullReference, ExtensionObject historyReadDetails, CancellationToken ct)
|
||||
{
|
||||
var session = RequireSession();
|
||||
if (!TryParseNodeId(session, fullReference, out var nodeId))
|
||||
{
|
||||
return new Core.Abstractions.HistoryReadResult([], null);
|
||||
}
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = nodeId },
|
||||
};
|
||||
// Make sure a session exists before queuing on the gate, but do NOT bind the wire call
|
||||
// (or the namespace-relative NodeId parse) to this reference — a reconnect can swap
|
||||
// Session while we wait on _gate. The session actually used is re-read inside the gate
|
||||
// (Driver.OpcUaClient-001/-006).
|
||||
_ = RequireSession();
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Re-read Session inside the critical section: if a reconnect completed while we
|
||||
// were blocked on _gate, OnReconnectComplete has already swapped in the new session.
|
||||
// NodeId parsing is namespace-relative, so it must also use the current session's
|
||||
// namespace table.
|
||||
var session = Session;
|
||||
if (session is null || !TryParseNodeId(session, fullReference, out var nodeId))
|
||||
{
|
||||
return new Core.Abstractions.HistoryReadResult([], null);
|
||||
}
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = nodeId },
|
||||
};
|
||||
|
||||
var resp = await session.HistoryReadAsync(
|
||||
requestHeader: null,
|
||||
historyReadDetails: historyReadDetails,
|
||||
@@ -1785,38 +1831,47 @@ public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWrit
|
||||
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
|
||||
NodeId notifierNodeId;
|
||||
if (string.IsNullOrEmpty(sourceName))
|
||||
{
|
||||
notifierNodeId = ObjectIds.Server;
|
||||
}
|
||||
else if (!TryParseNodeId(session, sourceName, out var parsed))
|
||||
{
|
||||
return new Core.Abstractions.HistoricalEventsResult([], null);
|
||||
}
|
||||
else
|
||||
{
|
||||
notifierNodeId = parsed;
|
||||
}
|
||||
|
||||
var details = new ReadEventDetails
|
||||
{
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
NumValuesPerNode = maxEvents <= 0 ? 0u : (uint)maxEvents,
|
||||
Filter = BuildBaseEventFilter(),
|
||||
};
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = notifierNodeId },
|
||||
};
|
||||
// Confirm a session exists before queuing; the session actually used — and the
|
||||
// namespace-relative source-NodeId parse — are re-read inside the gate so a reconnect
|
||||
// mid-wait can't leave us reading from a closed session (Driver.OpcUaClient-001/-006).
|
||||
_ = RequireSession();
|
||||
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
// Re-read Session inside the critical section: OnReconnectComplete may have swapped
|
||||
// in a new session while we were blocked on _gate, and the source-NodeId parse is
|
||||
// namespace-relative so it must bind against the current session's namespace table.
|
||||
var session = Session;
|
||||
if (session is null) return new Core.Abstractions.HistoricalEventsResult([], null);
|
||||
|
||||
NodeId notifierNodeId;
|
||||
if (string.IsNullOrEmpty(sourceName))
|
||||
{
|
||||
notifierNodeId = ObjectIds.Server;
|
||||
}
|
||||
else if (!TryParseNodeId(session, sourceName, out var parsed))
|
||||
{
|
||||
return new Core.Abstractions.HistoricalEventsResult([], null);
|
||||
}
|
||||
else
|
||||
{
|
||||
notifierNodeId = parsed;
|
||||
}
|
||||
|
||||
var details = new ReadEventDetails
|
||||
{
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
NumValuesPerNode = maxEvents <= 0 ? 0u : (uint)maxEvents,
|
||||
Filter = BuildBaseEventFilter(),
|
||||
};
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = notifierNodeId },
|
||||
};
|
||||
|
||||
var resp = await session.HistoryReadAsync(
|
||||
requestHeader: null,
|
||||
historyReadDetails: new ExtensionObject(details),
|
||||
|
||||
Reference in New Issue
Block a user