Files
lmxopcua/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs
T

393 lines
20 KiB
C#

using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
public sealed class DriverInstanceActorTests : RuntimeActorTestBase
{
/// <summary>Verifies that ApplyDelta calls ReinitializeAsync when connected and replies success.</summary>
[Fact]
public async Task ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success()
{
var driver = new StubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
// Drive: Initialize → Connected.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// Issue ApplyDelta and capture the reply via Ask.
var correlation = CorrelationId.NewId();
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
new DriverInstanceActor.ApplyDelta("{\"changed\":true}", correlation),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
reply.Correlation.ShouldBe(correlation);
driver.ReinitializeCount.ShouldBe(1);
}
/// <summary>Verifies that initialize failure keeps the actor in Reconnecting state.</summary>
[Fact]
public void Initialize_failure_keeps_actor_in_Reconnecting_state()
{
var driver = new StubDriver { InitializeShouldThrow = true };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// The actor should keep trying — we expect multiple Initialize calls because the
// reconnect timer fires every 50ms.
AwaitCondition(() => driver.InitializeCount >= 3, TimeSpan.FromSeconds(2));
}
/// <summary>Verifies that writing to a non-IWritable driver returns failure.</summary>
[Fact]
public async Task Write_against_non_IWritable_driver_returns_failure()
{
var driver = new StubDriver(); // IDriver only, no IWritable.
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeFalse();
reply.Reason!.ShouldContain("IWritable");
}
/// <summary>Verifies that writing to an IWritable driver returns success when status is Good.</summary>
[Fact]
public async Task Write_against_IWritable_returns_success_when_status_is_Good()
{
var driver = new WritableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
driver.Writes.Single().FullReference.ShouldBe("tag-1");
driver.Writes.Single().Value.ShouldBe(42);
}
/// <summary>Verifies that write propagates status code on Bad result.</summary>
[Fact]
public async Task Write_propagates_status_code_on_Bad_result()
{
const uint badStatus = 0x80340000; // BadOutOfService — top severity bits = 10b
var driver = new WritableStubDriver { NextStatusCode = badStatus };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeFalse();
reply.Reason!.ShouldContain("80340000");
}
/// <summary>Verifies that subscribing to an ISubscribable driver forwards OnDataChange to parent.</summary>
[Fact]
public async Task Subscribe_against_ISubscribable_forwards_OnDataChange_to_parent()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
// Driver fires an OnDataChange — actor should forward it to its parent as
// AttributeValuePublished with Quality mapped from StatusCode.
driver.FireDataChange("tag-a", value: 3.14, statusCode: 0u);
var published = parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2));
published.FullReference.ShouldBe("tag-a");
published.Value.ShouldBe(3.14);
published.Quality.ShouldBe(OpcUaQuality.Good);
}
/// <summary>Verifies that subscribe translates OPC UA status severity bits to OpcUaQuality.</summary>
[Fact]
public async Task Subscribe_translates_OPC_UA_status_severity_bits_to_OpcUaQuality()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
// Uncertain — severity bits 01 (top 2 bits = 01).
driver.FireDataChange("tag-1", value: 1, statusCode: 0x40000000u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Uncertain);
// Bad — severity bits 10.
driver.FireDataChange("tag-1", value: 2, statusCode: 0x80000000u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Bad);
}
/// <summary>
/// Verifies the SubscribeBulk pass: SetDesiredSubscriptions retains the ref set and the actor
/// auto-subscribes when it (re)enters Connected — including a re-subscribe after a reconnect,
/// closing the F8b/#113 gap that previously left galaxy variables at BadWaitingForInitialData.
/// </summary>
[Fact]
public async Task SetDesiredSubscriptions_auto_subscribes_on_connect_and_resubscribes_after_reconnect()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
// Desired set arrives BEFORE connect — retained, not yet applied.
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
// Connecting → Connected triggers the auto-subscribe.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
driver.LastSubscribedRefs.ShouldBe(new[] { "tag-a", "tag-b" });
// The auto-subscription is live — a data change reaches the parent.
driver.FireDataChange("tag-a", value: 7, statusCode: 0u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2)).Value.ShouldBe(7);
// Reconnect → the desired set is re-established without any new host message.
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
}
/// <summary>
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
/// bootstrap-restore path where `_subscriptionHandle is not null`.
/// </summary>
[Fact]
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
{
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// First subscribe — establishes the handle.
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
reply.ReferenceCount.ShouldBe(2);
driver.SubscribeCount.ShouldBe(2);
// Old handler must have been detached before the new one was attached — no leak.
driver.OnDataChangeSubscriberCount.ShouldBe(1);
}
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
[Fact]
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
{
var driver = new StubDriver(); // IDriver only
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.SubscriptionFailed>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
reply.Reason.ShouldContain("ISubscribable");
}
/// <summary>Verifies that DisconnectObserved detaches subscription handler so late events are dropped.</summary>
[Fact]
public async Task DisconnectObserved_detaches_subscription_handler_so_late_events_are_dropped()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend went away"));
// Race window — once disconnect is processed, subsequent FireDataChange calls hit a
// detached handler and don't push anything to the parent.
AwaitCondition(() => driver.OnDataChangeSubscriberCount == 0, TimeSpan.FromSeconds(2));
driver.FireDataChange("tag-1", value: 99, statusCode: 0u);
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
private class StubDriver : IDriver
{
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
public bool InitializeShouldThrow { get; set; }
/// <summary>Gets the number of times initialization was called.</summary>
public int InitializeCount;
/// <summary>Gets the number of times reinitialization was called.</summary>
public int ReinitializeCount;
/// <summary>Gets the driver instance ID.</summary>
public string DriverInstanceId => "stub-driver-1";
/// <summary>Gets the driver type.</summary>
public string DriverType => "Stub";
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
return Task.CompletedTask;
}
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
/// <param name="driverConfigJson">The driver configuration JSON.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref ReinitializeCount);
return Task.CompletedTask;
}
/// <summary>Shuts down the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <summary>Gets the health status of the driver.</summary>
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
/// <summary>Gets the memory footprint of the driver.</summary>
public long GetMemoryFootprint() => 0;
/// <summary>Flushes optional caches in the driver.</summary>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class WritableStubDriver : StubDriver, IWritable
{
/// <summary>Gets or sets the next status code to return from write operations.</summary>
public uint NextStatusCode { get; set; } = 0u;
/// <summary>Gets the list of write requests received.</summary>
public List<WriteRequest> Writes { get; } = new();
/// <summary>Writes the specified requests.</summary>
/// <param name="writes">The write requests.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes, CancellationToken cancellationToken)
{
Writes.AddRange(writes);
IReadOnlyList<WriteResult> results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList();
return Task.FromResult(results);
}
}
private sealed class SubscribableStubDriver : StubDriver, ISubscribable
{
/// <summary>Occurs when data changes.</summary>
public event EventHandler<DataChangeEventArgs>? OnDataChange;
private readonly StubHandle _handle = new();
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called (re-subscribe asserts).</summary>
public int SubscribeCount;
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
public IReadOnlyList<string>? LastSubscribedRefs;
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (`await Task.Yield()`)
/// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
public bool UnsubscribeYields { get; set; }
/// <summary>Subscribes to the specified full references.</summary>
/// <param name="fullReferences">The full references to subscribe to.</param>
/// <param name="publishingInterval">The publishing interval.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
Interlocked.Increment(ref SubscribeCount);
LastSubscribedRefs = fullReferences;
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
/// <param name="handle">The subscription handle.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (UnsubscribeYields)
{
// Complete the awaited task from a fresh background thread that has NO Akka actor
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
// a deterministic repro of the real async-backend no-ActorContext race.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult());
await tcs.Task.ConfigureAwait(false);
}
}
/// <summary>Fires a data change event with the specified parameters.</summary>
/// <param name="fullRef">The full reference of the data that changed.</param>
/// <param name="value">The new value.</param>
/// <param name="statusCode">The OPC UA status code.</param>
public void FireDataChange(string fullRef, object? value, uint statusCode)
{
var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow);
OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot));
}
private sealed class StubHandle : ISubscriptionHandle
{
/// <summary>Gets the diagnostic ID of the subscription.</summary>
public string DiagnosticId => "stub-sub";
}
}
}