using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
///
/// Shared stub harness used by DriverInstanceActorTests and
/// DriverInstanceActorWriteAndSubscribeTests. Promoted from the superset copy in
/// DriverInstanceActorTests so both suites compile against a single definition.
///
internal class StubDriver : IDriver
{
/// Gets or sets a value indicating whether initialization should throw.
public bool InitializeShouldThrow { get; set; }
/// Gets the number of times initialization was called.
public int InitializeCount;
/// Gets the number of times reinitialization was called.
public int ReinitializeCount;
private readonly object _initConfigsLock = new();
/// Every config string passed to , in call order.
public List InitConfigs { get; } = new();
/// Optional per-config init behaviour. When set, it fully owns the init outcome for that
/// config (await/throw); is ignored. Null ⇒ legacy behaviour.
public Func? InitBehavior { get; set; }
/// Gets the driver instance ID.
public string DriverInstanceId => "stub-driver-1";
/// Gets the driver type.
public string DriverType => "Stub";
/// Initializes the driver with the specified configuration JSON.
/// The driver configuration JSON.
/// Cancellation token for the operation.
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
}
/// Reinitializes the driver with the specified configuration JSON.
/// The driver configuration JSON.
/// Cancellation token for the operation.
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref ReinitializeCount);
return Task.CompletedTask;
}
/// Shuts down the driver.
/// Cancellation token for the operation.
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// Gets the health status of the driver.
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// Gets the memory footprint of the driver.
public long GetMemoryFootprint() => 0;
/// Flushes optional caches in the driver.
/// Cancellation token for the operation.
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
///
/// A that also implements , recording every
/// write call and returning a configurable status code.
///
internal sealed class WritableStubDriver : StubDriver, IWritable
{
/// Gets or sets the next status code to return from write operations.
public uint NextStatusCode { get; set; } = 0u;
/// Gets the list of write requests received.
public List Writes { get; } = new();
/// Writes the specified requests.
/// The write requests.
/// Cancellation token for the operation.
public Task> WriteAsync(
IReadOnlyList writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
///
/// A that also implements , firing
/// on demand and counting subscribe calls.
///
internal sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// Occurs when data changes.
public event EventHandler? OnDataChange;
private readonly StubHandle _handle = new();
/// Gets the number of subscribers to OnDataChange.
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// Number of times was called (re-subscribe asserts).
public int SubscribeCount;
/// The reference set passed to the most recent call.
public IReadOnlyList? LastSubscribedRefs;
/// When true, genuinely yields (await Task.Yield())
/// before completing, so a ConfigureAwait(false) continuation in the actor resumes off the
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
/// synchronously-completed stub task hides (the continuation otherwise runs inline).
public bool UnsubscribeYields { get; set; }
/// Subscribes to the specified full references.
/// The full references to subscribe to.
/// The publishing interval.
/// Cancellation token for the operation.
public Task SubscribeAsync(
IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
LastSubscribedRefs = fullReferences;
return Task.FromResult(_handle);
}
/// Unsubscribes from the specified subscription handle.
/// The subscription handle.
/// Cancellation token for the operation.
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (UnsubscribeYields)
{
// Complete the awaited task from a fresh background thread that has NO Akka actor
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
// a deterministic repro of the real async-backend no-ActorContext race.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult());
await tcs.Task.ConfigureAwait(false);
}
}
/// Fires a data change event with the specified parameters.
/// The full reference of the data that changed.
/// The new value.
/// The OPC UA status code.
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
}
/// Minimal for use by .
internal sealed class StubHandle : ISubscriptionHandle
{
/// Gets the diagnostic ID of the subscription.
public string DiagnosticId => "stub-sub";
}