test(drivers): extract shared stub-driver harness (de-dup)

This commit is contained in:
Joseph Doherty
2026-06-14 22:49:26 -04:00
parent d8129e5ab7
commit c03361de1b
3 changed files with 158 additions and 229 deletions
@@ -322,142 +322,4 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
}
private class StubDriver : IDriver
{
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
public bool InitializeShouldThrow { get; set; }
/// <summary>Gets the number of times initialization was called.</summary>
public int InitializeCount;
/// <summary>Gets the number of times reinitialization was called.</summary>
public int ReinitializeCount;
private readonly object _initConfigsLock = new();
/// <summary>Every config string passed to <see cref="InitializeAsync"/>, in call order.</summary>
public List<string> InitConfigs { get; } = new();
/// <summary>Optional per-config init behaviour. When set, it fully owns the init outcome for that
/// config (await/throw); <see cref="InitializeShouldThrow"/> is ignored. Null ⇒ legacy behaviour.</summary>
public Func<string, Task>? InitBehavior { get; set; }
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "stub-driver-1";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref ReinitializeCount);
return Task.CompletedTask;
}
/// <summary>Shuts down the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class WritableStubDriver : StubDriver, IWritable
{
/// <summary>Gets or sets the next status code to return from write operations.</summary>
public uint NextStatusCode { get; set; } = 0u;
/// <summary>Gets the list of write requests received.</summary>
public List<WriteRequest> Writes { get; } = new();
/// <summary>Writes the specified requests.</summary>
/// <param name="writes">The write requests.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
private sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// <summary>Occurs when data changes.</summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly StubHandle _handle = new();
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called (re-subscribe asserts).</summary>
public int SubscribeCount;
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
public IReadOnlyList<string>? LastSubscribedRefs;
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (`await Task.Yield()`)
/// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
public bool UnsubscribeYields { get; set; }
/// <summary>Subscribes to the specified full references.</summary>
/// <param name="fullReferences">The full references to subscribe to.</param>
/// <param name="publishingInterval">The publishing interval.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
LastSubscribedRefs = fullReferences;
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
/// <param name="handle">The subscription handle.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (UnsubscribeYields)
{
// Complete the awaited task from a fresh background thread that has NO Akka actor
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
// a deterministic repro of the real async-backend no-ActorContext race.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult());
await tcs.Task.ConfigureAwait(false);
}
}
/// <summary>Fires a data change event with the specified parameters.</summary>
/// <param name="fullRef">The full reference of the data that changed.</param>
/// <param name="value">The new value.</param>
/// <param name="statusCode">The OPC UA status code.</param>
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
private sealed class StubHandle : ISubscriptionHandle
{
/// <summary>Gets the diagnostic ID of the subscription.</summary>
public string DiagnosticId => "stub-sub";
}
}
}
@@ -173,42 +173,6 @@ public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTest
deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
// --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------
private class StubDriver : IDriver
{
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
public bool InitializeShouldThrow { get; set; }
/// <summary>Gets the number of times initialization was called.</summary>
public int InitializeCount;
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "stub-driver-1";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
return Task.CompletedTask;
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// <summary>Shuts down the driver.</summary>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
/// <summary>
/// A standalone <see cref="IDriver"/> (and <see cref="IWritable"/>) whose <c>InitializeAsync</c>
/// never completes, parking the actor in <c>Connecting</c>. Implements the interface directly
@@ -255,59 +219,4 @@ public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTest
}
}
private sealed class WritableStubDriver : StubDriver, IWritable
{
/// <summary>Gets or sets the next status code to return from write operations.</summary>
public uint NextStatusCode { get; set; } = 0u;
/// <summary>Gets the list of write requests received.</summary>
public List<WriteRequest> Writes { get; } = new();
/// <summary>Writes the specified requests.</summary>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
private sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// <summary>Occurs when data changes.</summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly StubHandle _handle = new();
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called.</summary>
public int SubscribeCount;
/// <summary>Subscribes to the specified full references.</summary>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) =>
Task.CompletedTask;
/// <summary>Fires a data change event with the specified parameters.</summary>
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
private sealed class StubHandle : ISubscriptionHandle
{
/// <summary>Gets the diagnostic ID of the subscription.</summary>
public string DiagnosticId => "stub-sub";
}
}
}
@@ -0,0 +1,158 @@
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
/// <summary>
/// Shared <see cref="IDriver"/> stub harness used by <c>DriverInstanceActorTests</c> and
/// <c>DriverInstanceActorWriteAndSubscribeTests</c>. Promoted from the superset copy in
/// <c>DriverInstanceActorTests</c> so both suites compile against a single definition.
/// </summary>
internal class StubDriver : IDriver
{
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
public bool InitializeShouldThrow { get; set; }
/// <summary>Gets the number of times initialization was called.</summary>
public int InitializeCount;
/// <summary>Gets the number of times reinitialization was called.</summary>
public int ReinitializeCount;
private readonly object _initConfigsLock = new();
/// <summary>Every config string passed to <see cref="InitializeAsync"/>, in call order.</summary>
public List<string> InitConfigs { get; } = new();
/// <summary>Optional per-config init behaviour. When set, it fully owns the init outcome for that
/// config (await/throw); <see cref="InitializeShouldThrow"/> is ignored. Null ⇒ legacy behaviour.</summary>
public Func<string, Task>? InitBehavior { get; set; }
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "stub-driver-1";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref ReinitializeCount);
return Task.CompletedTask;
}
/// <summary>Shuts down the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
/// <summary>
/// A <see cref="StubDriver"/> that also implements <see cref="IWritable"/>, recording every
/// write call and returning a configurable status code.
/// </summary>
internal sealed class WritableStubDriver : StubDriver, IWritable
{
/// <summary>Gets or sets the next status code to return from write operations.</summary>
public uint NextStatusCode { get; set; } = 0u;
/// <summary>Gets the list of write requests received.</summary>
public List<WriteRequest> Writes { get; } = new();
/// <summary>Writes the specified requests.</summary>
/// <param name="writes">The write requests.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
/// <summary>
/// A <see cref="StubDriver"/> that also implements <see cref="ISubscribable"/>, firing
/// <see cref="OnDataChange"/> on demand and counting subscribe calls.
/// </summary>
internal sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// <summary>Occurs when data changes.</summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly StubHandle _handle = new();
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called (re-subscribe asserts).</summary>
public int SubscribeCount;
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
public IReadOnlyList<string>? LastSubscribedRefs;
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (<c>await Task.Yield()</c>)
/// before completing, so a <c>ConfigureAwait(false)</c> continuation in the actor resumes off the
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
public bool UnsubscribeYields { get; set; }
/// <summary>Subscribes to the specified full references.</summary>
/// <param name="fullReferences">The full references to subscribe to.</param>
/// <param name="publishingInterval">The publishing interval.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
LastSubscribedRefs = fullReferences;
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
/// <param name="handle">The subscription handle.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (UnsubscribeYields)
{
// Complete the awaited task from a fresh background thread that has NO Akka actor
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
// a deterministic repro of the real async-backend no-ActorContext race.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult());
await tcs.Task.ConfigureAwait(false);
}
}
/// <summary>Fires a data change event with the specified parameters.</summary>
/// <param name="fullRef">The full reference of the data that changed.</param>
/// <param name="value">The new value.</param>
/// <param name="statusCode">The OPC UA status code.</param>
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
}
/// <summary>Minimal <see cref="ISubscriptionHandle"/> for use by <see cref="SubscribableStubDriver"/>.</summary>
internal sealed class StubHandle : ISubscriptionHandle
{
/// <summary>Gets the diagnostic ID of the subscription.</summary>
public string DiagnosticId => "stub-sub";
}