diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index 3e0d2fe9..9f3ad1f2 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -13,6 +13,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; @@ -40,6 +41,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name; public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); + /// Publishing interval handed to each driver's SubscribeBulk pass after an apply. + private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1); + private readonly IDbContextFactory _dbFactory; private readonly CommonsNodeId _localNode; private readonly IActorRef? _coordinatorOverride; @@ -224,10 +228,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg) { - // Pass driver-published values to the dependency mux when one is wired. Without a mux, - // VirtualTagActor evaluation can't fire — values just drop here. That's the dev/Mac path - // (no virtual tags registered); production binds the mux via the RuntimeActors extension. + // Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs). + // Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual + // tags registered); production binds the mux via the RuntimeActors extension. _dependencyMux?.Tell(msg); + + // Also push the value to the OPC UA sink so the materialised variable reflects live data + // instead of staying BadWaitingForInitialData. For SystemPlatform / Galaxy tags the variable + // NodeId is exactly the dot-form MXAccess reference the driver subscribed to, so the published + // FullReference maps straight onto the sink NodeId. + _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate( + msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc)); } private void Stale() @@ -300,6 +311,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers // composition. The publish actor handles the load-compose-diff-apply pipeline; we // just forward the same correlation id so the audit trail joins up. _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation)); + // SubscribeBulk pass: hand each driver its desired tag references so live values flow into + // the just-rebuilt address space instead of staying BadWaitingForInitialData. + PushDesiredSubscriptions(deploymentId); OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "ack")); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); @@ -357,6 +371,66 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers foreach (var spec in plan.ToSpawn) SpawnChild(spec); } + /// + /// SubscribeBulk pass. After an apply, read the deployment's SystemPlatform / Galaxy tags, + /// group their dot-form MXAccess references by driver instance, and hand each running driver + /// child its desired subscription set via . + /// The child retains the set and (re)subscribes on every Connected entry, so values stream into + /// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set + /// (which clears any stale subscription from a previous deployment). + /// + private void PushDesiredSubscriptions(DeploymentId deploymentId) + { + byte[] blob; + try + { + using var db = _dbFactory.CreateDbContext(); + blob = db.Deployments.AsNoTracking() + .Where(d => d.DeploymentId == deploymentId.Value) + .Select(d => d.ArtifactBlob) + .FirstOrDefault() ?? Array.Empty(); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId); + return; + } + + Phase7CompositionResult composition; + try + { + composition = DeploymentArtifact.ParseComposition(blob); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId); + return; + } + + var refsByDriver = composition.GalaxyTags + .GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal) + .ToDictionary( + g => g.Key, + g => (IReadOnlyList)g.Select(t => t.MxAccessRef) + .Distinct(StringComparer.Ordinal) + .ToArray(), + StringComparer.Ordinal); + + var total = 0; + foreach (var (driverId, entry) in _children) + { + var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); + entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval)); + total += refs.Count; + } + + if (total > 0) + { + _log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)", + _localNode, total, refsByDriver.Count); + } + } + private void SpawnChild(DriverInstanceSpec spec) { var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles); diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index b6f8a296..f777f473 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -41,6 +41,15 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers public sealed record WriteAttribute(string TagId, object Value); public sealed record WriteAttributeResult(bool Success, string? Reason); public sealed record Subscribe(IReadOnlyList FullReferences, TimeSpan PublishingInterval); + /// + /// Sets the set of references this driver should keep subscribed for the lifetime of the + /// current deployment. Unlike the one-shot , the desired set is + /// retained and (re)established automatically every time the actor (re)enters + /// Connected — closing the F8b/#113 "re-subscribe across reconnects" gap and giving + /// a single message to drive the SubscribeBulk pass after an + /// apply. Sending an empty set clears the desired subscription. + /// + public sealed record SetDesiredSubscriptions(IReadOnlyList FullReferences, TimeSpan PublishingInterval); public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount); public sealed record SubscriptionFailed(string Reason); public sealed record Unsubscribe; @@ -85,6 +94,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private ISubscriptionHandle? _subscriptionHandle; private EventHandler? _dataChangeHandler; + /// The references the host wants kept subscribed (set by ). + /// Re-applied on every entry into Connected so values resume after a reconnect or redeploy. + private IReadOnlyList _desiredRefs = Array.Empty(); + private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1); + /// /// Gets or sets the timer scheduler for scheduling reconnection attempts. /// @@ -189,6 +203,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed"))); Receive(_ => { /* stubbed drivers don't disconnect */ }); Receive(_ => { /* stubbed drivers don't reconnect */ }); + Receive(StoreDesiredSubscriptions); Receive(_ => PublishHealthSnapshot()); } @@ -200,6 +215,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers _log.Info("DriverInstance {Id}: connected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); + ResubscribeDesired(); }); Receive(msg => { @@ -208,6 +224,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Become(Reconnecting); PublishHealthSnapshot(); }); + Receive(StoreDesiredSubscriptions); Receive(_ => { /* already connecting — no-op */ }); Receive(_ => PublishHealthSnapshot()); } @@ -234,6 +251,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers ReceiveAsync(HandleWriteAsync); ReceiveAsync(HandleSubscribeAsync); ReceiveAsync(_ => UnsubscribeAsync()); + Receive(msg => + { + StoreDesiredSubscriptions(msg); + if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval)); + else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe()); + }); Receive(OnDataChangeForward); Receive(_ => PublishHealthSnapshot()); } @@ -247,8 +270,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); + ResubscribeDesired(); }); Receive(_ => { /* keep retrying via timer */ }); + Receive(StoreDesiredSubscriptions); Receive(_ => { /* already reconnecting — no-op */ }); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); @@ -393,6 +418,25 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers _subscriptionHandle = null; } + /// Records the host's desired subscription set without touching the live subscription. + /// The set is (re)applied by on the next Connected entry. + private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg) + { + _desiredRefs = msg.FullReferences; + _desiredInterval = msg.PublishingInterval; + } + + /// Re-establish the desired subscription after (re)connecting. Self-sends the one-shot + /// the Connected behaviour already handles (which drops any prior handle + /// first), so values resume streaming after a reconnect or redeploy without host involvement. + private void ResubscribeDesired() + { + if (_desiredRefs.Count > 0) + { + Self.Tell(new Subscribe(_desiredRefs, _desiredInterval)); + } + } + private void OnDataChangeForward(DataChangeForward msg) { var quality = QualityFromStatus(msg.Snapshot.StatusCode); diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs index a62af1ba..252ca0e3 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs @@ -152,6 +152,36 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase parent.ExpectMsg().Quality.ShouldBe(OpcUaQuality.Bad); } + /// + /// Verifies the SubscribeBulk pass: SetDesiredSubscriptions retains the ref set and the actor + /// auto-subscribes when it (re)enters Connected — including a re-subscribe after a reconnect, + /// closing the F8b/#113 gap that previously left galaxy variables at BadWaitingForInitialData. + /// + [Fact] + public async Task SetDesiredSubscriptions_auto_subscribes_on_connect_and_resubscribes_after_reconnect() + { + var driver = new SubscribableStubDriver(); + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50))); + + // Desired set arrives BEFORE connect — retained, not yet applied. + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions( + new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100))); + + // Connecting → Connected triggers the auto-subscribe. + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2)); + driver.LastSubscribedRefs.ShouldBe(new[] { "tag-a", "tag-b" }); + + // The auto-subscription is live — a data change reaches the parent. + driver.FireDataChange("tag-a", value: 7, statusCode: 0u); + parent.ExpectMsg(TimeSpan.FromSeconds(2)).Value.ShouldBe(7); + + // Reconnect → the desired set is re-established without any new host message. + actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip")); + AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3)); + } + /// Verifies that subscribing to a non-ISubscribable driver replies with failure. [Fact] public async Task Subscribe_against_non_ISubscribable_replies_with_failure() @@ -266,13 +296,22 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase /// Gets the number of subscribers to OnDataChange. public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0; + /// Number of times was called (re-subscribe asserts). + public int SubscribeCount; + /// The reference set passed to the most recent call. + public IReadOnlyList? LastSubscribedRefs; + /// Subscribes to the specified full references. /// The full references to subscribe to. /// The publishing interval. /// Cancellation token for the operation. public Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) - => Task.FromResult(_handle); + { + Interlocked.Increment(ref SubscribeCount); + LastSubscribedRefs = fullReferences; + return Task.FromResult(_handle); + } /// Unsubscribes from the specified subscription handle. /// The subscription handle.