326 lines
16 KiB
C#
326 lines
16 KiB
C#
using Akka.Actor;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
|
|
|
public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|
{
|
|
/// <summary>Verifies that ApplyDelta calls ReinitializeAsync when connected and replies success.</summary>
|
|
[Fact]
|
|
public async Task ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success()
|
|
{
|
|
var driver = new StubDriver();
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
// Drive: Initialize → Connected.
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
// Issue ApplyDelta and capture the reply via Ask.
|
|
var correlation = CorrelationId.NewId();
|
|
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
|
|
new DriverInstanceActor.ApplyDelta("{\"changed\":true}", correlation),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Correlation.ShouldBe(correlation);
|
|
driver.ReinitializeCount.ShouldBe(1);
|
|
}
|
|
|
|
/// <summary>Verifies that initialize failure keeps the actor in Reconnecting state.</summary>
|
|
[Fact]
|
|
public void Initialize_failure_keeps_actor_in_Reconnecting_state()
|
|
{
|
|
var driver = new StubDriver { InitializeShouldThrow = true };
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
|
|
// The actor should keep trying — we expect multiple Initialize calls because the
|
|
// reconnect timer fires every 50ms.
|
|
AwaitCondition(() => driver.InitializeCount >= 3, TimeSpan.FromSeconds(2));
|
|
}
|
|
|
|
/// <summary>Verifies that writing to a non-IWritable driver returns failure.</summary>
|
|
[Fact]
|
|
public async Task Write_against_non_IWritable_driver_returns_failure()
|
|
{
|
|
var driver = new StubDriver(); // IDriver only, no IWritable.
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
|
|
new DriverInstanceActor.WriteAttribute("tag-1", 42),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.Success.ShouldBeFalse();
|
|
reply.Reason!.ShouldContain("IWritable");
|
|
}
|
|
|
|
/// <summary>Verifies that writing to an IWritable driver returns success when status is Good.</summary>
|
|
[Fact]
|
|
public async Task Write_against_IWritable_returns_success_when_status_is_Good()
|
|
{
|
|
var driver = new WritableStubDriver();
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
|
|
new DriverInstanceActor.WriteAttribute("tag-1", 42),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
driver.Writes.Single().FullReference.ShouldBe("tag-1");
|
|
driver.Writes.Single().Value.ShouldBe(42);
|
|
}
|
|
|
|
/// <summary>Verifies that write propagates status code on Bad result.</summary>
|
|
[Fact]
|
|
public async Task Write_propagates_status_code_on_Bad_result()
|
|
{
|
|
const uint badStatus = 0x80340000; // BadOutOfService — top severity bits = 10b
|
|
var driver = new WritableStubDriver { NextStatusCode = badStatus };
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
|
|
new DriverInstanceActor.WriteAttribute("tag-1", 42),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.Success.ShouldBeFalse();
|
|
reply.Reason!.ShouldContain("80340000");
|
|
}
|
|
|
|
/// <summary>Verifies that subscribing to an ISubscribable driver forwards OnDataChange to parent.</summary>
|
|
[Fact]
|
|
public async Task Subscribe_against_ISubscribable_forwards_OnDataChange_to_parent()
|
|
{
|
|
var driver = new SubscribableStubDriver();
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Driver fires an OnDataChange — actor should forward it to its parent as
|
|
// AttributeValuePublished with Quality mapped from StatusCode.
|
|
driver.FireDataChange("tag-a", value: 3.14, statusCode: 0u);
|
|
|
|
var published = parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2));
|
|
published.FullReference.ShouldBe("tag-a");
|
|
published.Value.ShouldBe(3.14);
|
|
published.Quality.ShouldBe(OpcUaQuality.Good);
|
|
}
|
|
|
|
/// <summary>Verifies that subscribe translates OPC UA status severity bits to OpcUaQuality.</summary>
|
|
[Fact]
|
|
public async Task Subscribe_translates_OPC_UA_status_severity_bits_to_OpcUaQuality()
|
|
{
|
|
var driver = new SubscribableStubDriver();
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Uncertain — severity bits 01 (top 2 bits = 01).
|
|
driver.FireDataChange("tag-1", value: 1, statusCode: 0x40000000u);
|
|
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Uncertain);
|
|
|
|
// Bad — severity bits 10.
|
|
driver.FireDataChange("tag-1", value: 2, statusCode: 0x80000000u);
|
|
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Bad);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the SubscribeBulk pass: SetDesiredSubscriptions retains the ref set and the actor
|
|
/// auto-subscribes when it (re)enters Connected — including a re-subscribe after a reconnect,
|
|
/// closing the F8b/#113 gap that previously left galaxy variables at BadWaitingForInitialData.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task SetDesiredSubscriptions_auto_subscribes_on_connect_and_resubscribes_after_reconnect()
|
|
{
|
|
var driver = new SubscribableStubDriver();
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
|
|
|
// Desired set arrives BEFORE connect — retained, not yet applied.
|
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
|
|
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
|
|
|
|
// Connecting → Connected triggers the auto-subscribe.
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
|
|
driver.LastSubscribedRefs.ShouldBe(new[] { "tag-a", "tag-b" });
|
|
|
|
// The auto-subscription is live — a data change reaches the parent.
|
|
driver.FireDataChange("tag-a", value: 7, statusCode: 0u);
|
|
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2)).Value.ShouldBe(7);
|
|
|
|
// Reconnect → the desired set is re-established without any new host message.
|
|
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
|
|
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
|
|
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
|
|
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
|
|
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
|
|
/// bootstrap-restore path where `_subscriptionHandle is not null`.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
|
|
{
|
|
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
|
|
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
|
|
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
|
|
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
// First subscribe — establishes the handle.
|
|
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
|
|
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
|
|
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.ReferenceCount.ShouldBe(2);
|
|
driver.SubscribeCount.ShouldBe(2);
|
|
// Old handler must have been detached before the new one was attached — no leak.
|
|
driver.OnDataChangeSubscriberCount.ShouldBe(1);
|
|
}
|
|
|
|
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
|
|
[Fact]
|
|
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
|
|
{
|
|
var driver = new StubDriver(); // IDriver only
|
|
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
|
|
var reply = await actor.Ask<DriverInstanceActor.SubscriptionFailed>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
reply.Reason.ShouldContain("ISubscribable");
|
|
}
|
|
|
|
/// <summary>Verifies that DisconnectObserved detaches subscription handler so late events are dropped.</summary>
|
|
[Fact]
|
|
public async Task DisconnectObserved_detaches_subscription_handler_so_late_events_are_dropped()
|
|
{
|
|
var driver = new SubscribableStubDriver();
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
|
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
|
new DriverInstanceActor.Subscribe(new[] { "tag-1" }, TimeSpan.FromMilliseconds(100)),
|
|
TimeSpan.FromSeconds(3));
|
|
|
|
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend went away"));
|
|
|
|
// Race window — once disconnect is processed, subsequent FireDataChange calls hit a
|
|
// detached handler and don't push anything to the parent.
|
|
AwaitCondition(() => driver.OnDataChangeSubscriberCount == 0, TimeSpan.FromSeconds(2));
|
|
driver.FireDataChange("tag-1", value: 99, statusCode: 0u);
|
|
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
|
}
|
|
|
|
/// <summary>A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config
|
|
/// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7.</summary>
|
|
[Fact]
|
|
public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects()
|
|
{
|
|
const string bad = "{\"v\":\"bad\"}";
|
|
const string good = "{\"v\":\"good\"}";
|
|
var driver = new SubscribableStubDriver
|
|
{
|
|
InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"),
|
|
};
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
|
|
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested(bad));
|
|
AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2));
|
|
driver.SubscribeCount.ShouldBe(0);
|
|
|
|
var correlation = CorrelationId.NewId();
|
|
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
|
|
new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3));
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Correlation.ShouldBe(correlation);
|
|
|
|
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
|
driver.InitConfigs.ShouldContain(good);
|
|
}
|
|
|
|
/// <summary>A stale InitializeSucceeded from an old (superseded) config cannot hijack the state:
|
|
/// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the
|
|
/// old init completing afterwards is ignored, and only the new config drives Connected.</summary>
|
|
[Fact]
|
|
public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored()
|
|
{
|
|
const string v1 = "{\"v\":1}";
|
|
const string v2 = "{\"v\":2}";
|
|
var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var driver = new SubscribableStubDriver
|
|
{
|
|
InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task,
|
|
};
|
|
var parent = CreateTestProbe();
|
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
|
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
|
|
|
actor.Tell(new DriverInstanceActor.InitializeRequested(v1));
|
|
AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2));
|
|
|
|
await actor.Ask<DriverInstanceActor.ApplyResult>(
|
|
new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3));
|
|
AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2));
|
|
|
|
gate1.SetResult();
|
|
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400));
|
|
driver.SubscribeCount.ShouldBe(0);
|
|
|
|
gate2.SetResult();
|
|
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
|
}
|
|
|
|
}
|