feat(runtime): wire driver SubscribeBulk pass so tag values stream
v2-ci / build (push) Failing after 51s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped

Materialised SystemPlatform/Galaxy variables previously stayed
BadWaitingForInitialData because nothing told the driver to subscribe
(OpcUaPublishActor TODO 'on a future SubscribeBulk pass') and published
values were only forwarded to the VirtualTag mux, never the OPC UA sink.

DriverHostActor now, after each apply, groups the deployment's galaxy tag
MXAccess refs by driver and sends DriverInstanceActor.SetDesiredSubscriptions;
the actor retains the set and (re)subscribes on every Connected entry, so
values resume after reconnects/redeploys (closes the F8b/#113 gap). Published
values are also forwarded to OpcUaPublishActor as AttributeValueUpdate
(NodeId == galaxy MxAccessRef) so the materialised variable shows live data.

Verified live in docker-dev: galaxy TestMachine_001 tags go Good with a
changing TestChangingInt. +1 unit test.
This commit is contained in:
Joseph Doherty
2026-06-06 12:31:55 -04:00
parent 83b8d75112
commit c1ce5833e9
3 changed files with 161 additions and 4 deletions
@@ -13,6 +13,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
@@ -40,6 +41,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
/// <summary>Publishing interval handed to each driver's SubscribeBulk pass after an apply.</summary>
private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly CommonsNodeId _localNode;
private readonly IActorRef? _coordinatorOverride;
@@ -224,10 +228,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
{
// Pass driver-published values to the dependency mux when one is wired. Without a mux,
// VirtualTagActor evaluation can't fire — values just drop here. That's the dev/Mac path
// (no virtual tags registered); production binds the mux via the RuntimeActors extension.
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs).
// Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual
// tags registered); production binds the mux via the RuntimeActors extension.
_dependencyMux?.Tell(msg);
// Also push the value to the OPC UA sink so the materialised variable reflects live data
// instead of staying BadWaitingForInitialData. For SystemPlatform / Galaxy tags the variable
// NodeId is exactly the dot-form MXAccess reference the driver subscribed to, so the published
// FullReference maps straight onto the sink NodeId.
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc));
}
private void Stale()
@@ -300,6 +311,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
// just forward the same correlation id so the audit trail joins up.
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation));
// SubscribeBulk pass: hand each driver its desired tag references so live values flow into
// the just-rebuilt address space instead of staying BadWaitingForInitialData.
PushDesiredSubscriptions(deploymentId);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "ack"));
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
_localNode, deploymentId, revision, _children.Count);
@@ -357,6 +371,66 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
}
/// <summary>
/// SubscribeBulk pass. After an apply, read the deployment's SystemPlatform / Galaxy tags,
/// group their dot-form MXAccess references by driver instance, and hand each running driver
/// child its desired subscription set via <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>.
/// The child retains the set and (re)subscribes on every Connected entry, so values stream into
/// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set
/// (which clears any stale subscription from a previous deployment).
/// </summary>
private void PushDesiredSubscriptions(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty<byte>();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
Phase7CompositionResult composition;
try
{
composition = DeploymentArtifact.ParseComposition(blob);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
var refsByDriver = composition.GalaxyTags
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
.ToDictionary(
g => g.Key,
g => (IReadOnlyList<string>)g.Select(t => t.MxAccessRef)
.Distinct(StringComparer.Ordinal)
.ToArray(),
StringComparer.Ordinal);
var total = 0;
foreach (var (driverId, entry) in _children)
{
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval));
total += refs.Count;
}
if (total > 0)
{
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
_localNode, total, refsByDriver.Count);
}
}
private void SpawnChild(DriverInstanceSpec spec)
{
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
@@ -41,6 +41,15 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
public sealed record WriteAttribute(string TagId, object Value);
public sealed record WriteAttributeResult(bool Success, string? Reason);
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
/// <summary>
/// Sets the set of references this driver should keep subscribed for the lifetime of the
/// current deployment. Unlike the one-shot <see cref="Subscribe"/>, the desired set is
/// retained and (re)established automatically every time the actor (re)enters
/// <c>Connected</c> — closing the F8b/#113 "re-subscribe across reconnects" gap and giving
/// <see cref="DriverHostActor"/> a single message to drive the SubscribeBulk pass after an
/// apply. Sending an empty set clears the desired subscription.
/// </summary>
public sealed record SetDesiredSubscriptions(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
public sealed record SubscriptionFailed(string Reason);
public sealed record Unsubscribe;
@@ -85,6 +94,11 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private ISubscriptionHandle? _subscriptionHandle;
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
/// <summary>The references the host wants kept subscribed (set by <see cref="SetDesiredSubscriptions"/>).
/// Re-applied on every entry into <c>Connected</c> so values resume after a reconnect or redeploy.</summary>
private IReadOnlyList<string> _desiredRefs = Array.Empty<string>();
private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1);
/// <summary>
/// Gets or sets the timer scheduler for scheduling reconnection attempts.
/// </summary>
@@ -189,6 +203,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
@@ -200,6 +215,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
ResubscribeDesired();
});
Receive<InitializeFailed>(msg =>
{
@@ -208,6 +224,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Become(Reconnecting);
PublishHealthSnapshot();
});
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already connecting — no-op */ });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
@@ -234,6 +251,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
Receive<SetDesiredSubscriptions>(msg =>
{
StoreDesiredSubscriptions(msg);
if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
});
Receive<DataChangeForward>(OnDataChangeForward);
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
@@ -247,8 +270,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
ResubscribeDesired();
});
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already reconnecting — no-op */ });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
@@ -393,6 +418,25 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
_subscriptionHandle = null;
}
/// <summary>Records the host's desired subscription set without touching the live subscription.
/// The set is (re)applied by <see cref="ResubscribeDesired"/> on the next <c>Connected</c> entry.</summary>
private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg)
{
_desiredRefs = msg.FullReferences;
_desiredInterval = msg.PublishingInterval;
}
/// <summary>Re-establish the desired subscription after (re)connecting. Self-sends the one-shot
/// <see cref="Subscribe"/> the Connected behaviour already handles (which drops any prior handle
/// first), so values resume streaming after a reconnect or redeploy without host involvement.</summary>
private void ResubscribeDesired()
{
if (_desiredRefs.Count > 0)
{
Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
}
}
private void OnDataChangeForward(DataChangeForward msg)
{
var quality = QualityFromStatus(msg.Snapshot.StatusCode);
@@ -152,6 +152,36 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>().Quality.ShouldBe(OpcUaQuality.Bad);
}
/// <summary>
/// Verifies the SubscribeBulk pass: SetDesiredSubscriptions retains the ref set and the actor
/// auto-subscribes when it (re)enters Connected — including a re-subscribe after a reconnect,
/// closing the F8b/#113 gap that previously left galaxy variables at BadWaitingForInitialData.
/// </summary>
[Fact]
public async Task SetDesiredSubscriptions_auto_subscribes_on_connect_and_resubscribes_after_reconnect()
{
var driver = new SubscribableStubDriver();
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
// Desired set arrives BEFORE connect — retained, not yet applied.
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(
new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(100)));
// Connecting → Connected triggers the auto-subscribe.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(2));
driver.LastSubscribedRefs.ShouldBe(new[] { "tag-a", "tag-b" });
// The auto-subscription is live — a data change reaches the parent.
driver.FireDataChange("tag-a", value: 7, statusCode: 0u);
parent.ExpectMsg<DriverInstanceActor.AttributeValuePublished>(TimeSpan.FromSeconds(2)).Value.ShouldBe(7);
// Reconnect → the desired set is re-established without any new host message.
actor.Tell(new DriverInstanceActor.DisconnectObserved("backend blip"));
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
}
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
[Fact]
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
@@ -266,13 +296,22 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
/// <summary>Gets the number of subscribers to OnDataChange.</summary>
public int OnDataChangeSubscriberCount => OnDataChange?.GetInvocationList().Length ?? 0;
/// <summary>Number of times <see cref="SubscribeAsync"/> was called (re-subscribe asserts).</summary>
public int SubscribeCount;
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
public IReadOnlyList<string>? LastSubscribedRefs;
/// <summary>Subscribes to the specified full references.</summary>
/// <param name="fullReferences">The full references to subscribe to.</param>
/// <param name="publishingInterval">The publishing interval.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
=> Task.FromResult<ISubscriptionHandle>(_handle);
{
Interlocked.Increment(ref SubscribeCount);
LastSubscribedRefs = fullReferences;
return Task.FromResult<ISubscriptionHandle>(_handle);
}
/// <summary>Unsubscribes from the specified subscription handle.</summary>
/// <param name="handle">The subscription handle.</param>