diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index 8485e9ba..a873eef9 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -211,6 +211,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private void Connecting() { Receive(msg => InitializeAsync(msg.DriverConfigJson)); + // Fast-fail writes while still connecting — without this the inbound WriteAttribute dead-letters + // and DriverHostActor.HandleRouteNodeWrite waits its full 8s Ask before reporting a generic + // "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance). + Receive(_ => + Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); Receive(_ => { _log.Info("DriverInstance {Id}: connected", _driverInstanceId); @@ -227,6 +232,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers }); Receive(StoreDesiredSubscriptions); Receive(_ => { /* already connecting — no-op */ }); + // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the + // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", + _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); Receive(_ => PublishHealthSnapshot()); } @@ -259,12 +269,21 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe()); }); Receive(OnDataChangeForward); + // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the + // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", + _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); Receive(_ => PublishHealthSnapshot()); } private void Reconnecting() { Receive(_ => InitializeAsync(_currentConfigJson ?? "{}")); + // Fast-fail writes while reconnecting (same reason as Connecting — avoids the 8s host Ask + // timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance). + Receive(_ => + Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); Receive(_ => { Timers.Cancel("retry-connect"); @@ -276,6 +295,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(_ => { /* keep retrying via timer */ }); Receive(StoreDesiredSubscriptions); Receive(_ => { /* already reconnecting — no-op */ }); + // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the + // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", + _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs new file mode 100644 index 00000000..9fd57ece --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs @@ -0,0 +1,274 @@ +using Akka.Actor; +using Akka.Event; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Covers two robustness nits on the generic : +/// (1) writes that arrive while the actor is NOT in Connected (Stubbed / Connecting / +/// Reconnecting) must fast-fail with a negative +/// rather than dead-letter and let the host Ask wait its full 8s timeout; and +/// (2) the self-Tell Subscribe issued by ResubscribeDesired on (re)connect must not +/// leave a stray dead-lettering when its +/// reply lands back at Self. +/// +public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTestBase +{ + /// + /// A write to a Stubbed actor fast-fails synchronously: a Stubbed driver never connects, yet + /// it must answer a Ask with a negative result + /// well inside the host's 8s timeout instead of dead-lettering. + /// + [Fact] + public async Task WriteAttribute_to_stubbed_driver_fast_fails() + { + var driver = new WritableStubDriver(); + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, startStubbed: true)); + + var reply = await actor.Ask( + new DriverInstanceActor.WriteAttribute("tag-1", 42), + TimeSpan.FromSeconds(2)); + + // Stubbed drivers deterministically succeed writes without touching hardware (existing behaviour). + reply.Success.ShouldBeTrue(); + } + + /// + /// A write to a Connecting actor (InitializeAsync still in flight, never resolves) fast-fails + /// with Success=false and a non-null reason in well under the 8s host Ask timeout — + /// proving the synchronous fast-fail in Connecting() fires instead of dead-lettering. + /// + [Fact] + public async Task WriteAttribute_to_connecting_driver_fast_fails() + { + // BlockingInitDriver.InitializeAsync never completes, so the actor stays in Connecting. + var driver = new BlockingInitDriver(); + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeStarted, TimeSpan.FromSeconds(2)); + + var sw = System.Diagnostics.Stopwatch.StartNew(); + var reply = await actor.Ask( + new DriverInstanceActor.WriteAttribute("tag-1", 42), + TimeSpan.FromSeconds(2)); + sw.Stop(); + + reply.Success.ShouldBeFalse(); + reply.Reason.ShouldNotBeNull(); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); + } + + /// + /// A write to a Reconnecting actor (initial connect failed, retrying) fast-fails with a negative + /// result rather than dead-lettering and hanging the host Ask. + /// + [Fact] + public async Task WriteAttribute_to_reconnecting_driver_fast_fails() + { + // InitializeShouldThrow drives Connecting → Reconnecting; the slow reconnect interval keeps it + // parked there long enough to take a write. + var driver = new WritableStubDriver { InitializeShouldThrow = true }; + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30))); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + // The first Initialize attempt throws and pushes the actor into Reconnecting. + AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2)); + + var reply = await actor.Ask( + new DriverInstanceActor.WriteAttribute("tag-1", 42), + TimeSpan.FromSeconds(2)); + + reply.Success.ShouldBeFalse(); + reply.Reason.ShouldNotBeNull(); + } + + /// + /// Driving a connect + auto-resubscribe cycle (the self-Tell Subscribe from + /// ResubscribeDesired whose SubscriptionEstablished reply lands at Self) must not + /// produce a dead-letter. + /// + [Fact] + public async Task Self_resubscribe_does_not_deadletter_SubscriptionEstablished() + { + // The probe IGNORES every DeadLetter except those carrying a SubscriptionEstablished payload, + // so its mailbox only ever holds the message we care about. That makes the assertion precise: + // any swallow miss surfaces as a SubscriptionEstablished DeadLetter, and nothing else + // (health-poll cruft, remoting-terminator letters) can give a false positive. + var deadLetters = CreateTestProbe(); + deadLetters.IgnoreMessages(m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionEstablished }); + Sys.EventStream.Subscribe(deadLetters.Ref, typeof(AllDeadLetters)); + + var driver = new SubscribableStubDriver(); + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50))); + + // Desired set arrives first; on connect, ResubscribeDesired self-Tells Subscribe, whose + // SubscriptionEstablished reply lands at Self. + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions( + new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100))); + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2)); + + // Force a reconnect to exercise the resubscribe path a second time. + actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip")); + AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3)); + + // BARRIER: the self-resubscribe reply is published asynchronously, so SubscribeCount>=2 alone + // doesn't guarantee the actor has yet PROCESSED the self-sent SubscriptionEstablished (the point + // where it either swallows it or — pre-fix — dead-letters it). Drive a real Subscribe Ask from + // the TEST and await its reply: Akka processes the mailbox in order, so once this round-trips, + // every earlier self-resubscribe reply has already been handled and any dead-letter has already + // been published to the EventStream (Unhandled publishes synchronously). Only THEN is ExpectNoMsg + // race-free. + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)), + TimeSpan.FromSeconds(3)); + + deadLetters.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + + // --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------ + + private class StubDriver : IDriver + { + /// Gets or sets a value indicating whether initialization should throw. + public bool InitializeShouldThrow { get; set; } + /// Gets the number of times initialization was called. + public int InitializeCount; + + /// Gets the driver instance ID. + public string DriverInstanceId => "stub-driver-1"; + /// Gets the driver type. + public string DriverType => "Stub"; + + /// Initializes the driver with the specified configuration JSON. + public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + { + Interlocked.Increment(ref InitializeCount); + if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail"); + return Task.CompletedTask; + } + + /// Reinitializes the driver with the specified configuration JSON. + public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Shuts down the driver. + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// Gets the health status of the driver. + public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null); + /// Gets the memory footprint of the driver. + public long GetMemoryFootprint() => 0; + /// Flushes optional caches in the driver. + public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } + + /// + /// A standalone (and ) whose InitializeAsync + /// never completes, parking the actor in Connecting. Implements the interface directly + /// (not via ) so the never-completing Init is reached through the + /// polymorphic call the actor makes. + /// + private sealed class BlockingInitDriver : IDriver, IWritable + { + /// Set true the moment InitializeAsync is entered. + public volatile bool InitializeStarted; + private readonly TaskCompletionSource _gate = new(TaskCreationOptions.RunContinuationsAsynchronously); + + /// Gets the driver instance ID. + public string DriverInstanceId => "blocking-init-driver"; + /// Gets the driver type. + public string DriverType => "Stub"; + + /// Never completes — keeps the actor in Connecting. + public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + { + InitializeStarted = true; + return _gate.Task; + } + + /// Reinitializes the driver with the specified configuration JSON. + public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Shuts down the driver. + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// Gets the health status of the driver. + public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null); + /// Gets the memory footprint of the driver. + public long GetMemoryFootprint() => 0; + /// Flushes optional caches in the driver. + public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + /// Writes the specified requests (never reached — actor fast-fails in Connecting). + public Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + IReadOnlyList results = writes.Select(_ => new WriteResult(0u)).ToList(); + return Task.FromResult(results); + } + } + + private sealed class WritableStubDriver : StubDriver, IWritable + { + /// Gets or sets the next status code to return from write operations. + public uint NextStatusCode { get; set; } = 0u; + /// Gets the list of write requests received. + public List Writes { get; } = new(); + + /// Writes the specified requests. + public Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + Writes.AddRange(writes); + IReadOnlyList results = writes.Select(_ => new WriteResult(NextStatusCode)).ToList(); + return Task.FromResult(results); + } + } + + private sealed class SubscribableStubDriver : StubDriver, ISubscribable + { + /// Occurs when data changes. + public event EventHandler? OnDataChange; + + private readonly StubHandle _handle = new(); + + /// Gets the number of subscribers to OnDataChange. + public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0; + + /// Number of times was called. + public int SubscribeCount; + + /// Subscribes to the specified full references. + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + Interlocked.Increment(ref SubscribeCount); + return Task.FromResult(_handle); + } + + /// Unsubscribes from the specified subscription handle. + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) => + Task.CompletedTask; + + /// Fires a data change event with the specified parameters. + public void FireDataChange(string fullRef, object? value, uint statusCode) + { + var snapshot = new DataValueSnapshot(value, statusCode, DateTime.UtcNow, DateTime.UtcNow); + OnDataChange?.Invoke(this, new DataChangeEventArgs(_handle, fullRef, snapshot)); + } + + private sealed class StubHandle : ISubscriptionHandle + { + /// Gets the diagnostic ID of the subscription. + public string DiagnosticId => "stub-sub"; + } + } +}