Files

326 lines
16 KiB
C#

using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
public sealed class DriverInstanceActorTests : RuntimeActorTestBase
{
/// <summary>Verifies that ApplyDelta calls ReinitializeAsync when connected and replies success.</summary>
[Fact]
public async Task ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success()
{
var driver = new StubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
// Drive: Initialize → Connected.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// Issue ApplyDelta and capture the reply via Ask.
var correlation = CorrelationId.NewId();
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
new DriverInstanceActor.ApplyDelta("{\"changed\":true}", correlation),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
reply.Correlation.ShouldBe(correlation);
driver.ReinitializeCount.ShouldBe(1);
}
/// <summary>Verifies that initialize failure keeps the actor in Reconnecting state.</summary>
[Fact]
public void Initialize_failure_keeps_actor_in_Reconnecting_state()
{
var driver = new StubDriver { InitializeShouldThrow = true };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// The actor should keep trying — we expect multiple Initialize calls because the
// reconnect timer fires every 50ms.
AwaitCondition(() => driver.InitializeCount >= 3, TimeSpan.FromSeconds(2));
}
/// <summary>Verifies that writing to a non-IWritable driver returns failure.</summary>
[Fact]
public async Task Write_against_non_IWritable_driver_returns_failure()
{
var driver = new StubDriver(); // IDriver only, no IWritable.
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeFalse();
reply.Reason!.ShouldContain("IWritable");
}
/// <summary>Verifies that writing to an IWritable driver returns success when status is Good.</summary>
[Fact]
public async Task Write_against_IWritable_returns_success_when_status_is_Good()
{
var driver = new WritableStubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
driver.Writes.Single().FullReference.ShouldBe("tag-1");
driver.Writes.Single().Value.ShouldBe(42);
}
/// <summary>Verifies that write propagates status code on Bad result.</summary>
[Fact]
public async Task Write_propagates_status_code_on_Bad_result()
{
const uint badStatus = 0x80340000; // BadOutOfService — top severity bits = 10b
var driver = new WritableStubDriver { NextStatusCode = badStatus };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeFalse();
reply.Reason!.ShouldContain("80340000");
}
/// <summary>Verifies that subscribing to an ISubscribable driver forwards OnDataChange to parent.</summary>
[Fact]
public async Task Subscribe_against_ISubscribable_forwards_OnDataChange_to_parent()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
// Driver fires an OnDataChange — actor should forward it to its parent as
// AttributeValuePublished with Quality mapped from StatusCode.
driver.FireDataChange("tag-a", value: 3.14, statusCode: 0u);
var published = parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2));
published.FullReference.ShouldBe("tag-a");
published.Value.ShouldBe(3.14);
published.Quality.ShouldBe(OpcUaQuality.Good);
}
/// <summary>Verifies that subscribe translates OPC UA status severity bits to OpcUaQuality.</summary>
[Fact]
public async Task Subscribe_translates_OPC_UA_status_severity_bits_to_OpcUaQuality()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
// Uncertain — severity bits 01 (top 2 bits = 01).
driver.FireDataChange("tag-1", value: 1, statusCode: 0x40000000u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Uncertain);
// Bad — severity bits 10.
driver.FireDataChange("tag-1", value: 2, statusCode: 0x80000000u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Bad);
}
/// <summary>
/// Verifies the SubscribeBulk pass: SetDesiredSubscriptions retains the ref set and the actor
/// auto-subscribes when it (re)enters Connected — including a re-subscribe after a reconnect,
/// closing the F8b/#113 gap that previously left galaxy variables at BadWaitingForInitialData.
/// </summary>
[Fact]
public async Task SetDesiredSubscriptions_auto_subscribes_on_connect_and_resubscribes_after_reconnect()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
// Desired set arrives BEFORE connect — retained, not yet applied.
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
// Connecting → Connected triggers the auto-subscribe.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
driver.LastSubscribedRefs.ShouldBe(new[] { "tag-a", "tag-b" });
// The auto-subscription is live — a data change reaches the parent.
driver.FireDataChange("tag-a", value: 7, statusCode: 0u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2)).Value.ShouldBe(7);
// Reconnect → the desired set is re-established without any new host message.
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
}
/// <summary>
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
/// bootstrap-restore path where `_subscriptionHandle is not null`.
/// </summary>
[Fact]
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
{
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// First subscribe — establishes the handle.
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
reply.ReferenceCount.ShouldBe(2);
driver.SubscribeCount.ShouldBe(2);
// Old handler must have been detached before the new one was attached — no leak.
driver.OnDataChangeSubscriberCount.ShouldBe(1);
}
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
[Fact]
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
{
var driver = new StubDriver(); // IDriver only
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.SubscriptionFailed>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
reply.Reason.ShouldContain("ISubscribable");
}
/// <summary>Verifies that DisconnectObserved detaches subscription handler so late events are dropped.</summary>
[Fact]
public async Task DisconnectObserved_detaches_subscription_handler_so_late_events_are_dropped()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
TimeSpan.FromSeconds(3));
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend went away"));
// Race window — once disconnect is processed, subsequent FireDataChange calls hit a
// detached handler and don't push anything to the parent.
AwaitCondition(() => driver.OnDataChangeSubscriberCount == 0, TimeSpan.FromSeconds(2));
driver.FireDataChange("tag-1", value: 99, statusCode: 0u);
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
/// <summary>A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config
/// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7.</summary>
[Fact]
public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects()
{
const string bad = "{\"v\":\"bad\"}";
const string good = "{\"v\":\"good\"}";
var driver = new SubscribableStubDriver
{
InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"),
};
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
actor.Tell(new DriverInstanceActor.InitializeRequested(bad));
AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2));
driver.SubscribeCount.ShouldBe(0);
var correlation = CorrelationId.NewId();
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
reply.Correlation.ShouldBe(correlation);
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
driver.InitConfigs.ShouldContain(good);
}
/// <summary>A stale InitializeSucceeded from an old (superseded) config cannot hijack the state:
/// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the
/// old init completing afterwards is ignored, and only the new config drives Connected.</summary>
[Fact]
public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored()
{
const string v1 = "{\"v\":1}";
const string v2 = "{\"v\":2}";
var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var driver = new SubscribableStubDriver
{
InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task,
};
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
actor.Tell(new DriverInstanceActor.InitializeRequested(v1));
AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2));
await actor.Ask<DriverInstanceActor.ApplyResult>(
new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3));
AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2));
gate1.SetResult();
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400));
driver.SubscribeCount.ShouldBe(0);
gate2.SetResult();
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
}
}