feat(runtime): #113 DependencyMuxActor — drivers → virtual-tag fan-out
Some checks failed
v2-ci / build (push) Failing after 36s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped

End-to-end data path is now wired on the read side: driver subscriptions
fire AttributeValuePublished → DriverHostActor → DependencyMuxActor →
DependencyValueChanged to every interested VirtualTagActor. Previously
the publish hit a dead-letter at the host.

DependencyMuxActor:
  - Per-node fan-out router. Maintains tagRef → Set<IActorRef> with a
    reverse subscriber → refs index so unregister/replace are O(refs).
  - Watches subscribers; Terminated triggers automatic unregister so
    dead virtual-tag actors stop receiving publishes.
  - Re-register replaces the prior interest set — no stale-ref leaks
    on actor restart.
  - Drops publishes for refs with no interested subscribers.

VirtualTagActor:
  - New Props params: dependencyRefs + mux ActorRef.
  - PreStart sends RegisterInterest to the mux; PostStop sends
    UnregisterInterest. Default both null so older callers stay quiet.

DriverHostActor:
  - New dependencyMux Props param. Steady + Applying states now
    receive AttributeValuePublished from their DriverInstance children
    and forward to the mux. Null mux is a no-op (dev/Mac).

ServiceCollectionExtensions:
  - WithOtOpcUaRuntimeActors spawns DependencyMuxActor before
    DriverHostActor and threads its ActorRef into the host's Props.
    New DependencyMuxActorKey + DependencyMuxActorName.

Tests: Runtime 57 -> 63 (+6):
- Mux forwards to only subscribers interested in each ref
- Publish for unregistered ref is dropped silently
- Unregister stops forwarding
- Re-register replaces prior interest set
- VirtualTagActor PreStart registration drives end-to-end eval
  (uses AwaitAssert to race-safely settle the PreStart Tell)
- DriverHostActor forwards AttributeValuePublished through to mux

All 6 v2 test suites green: 163 tests passing.

F8 (#79) state updated — dep subscribe seam shipped, Core.VirtualTags
production engine binding (compile + ITagUpstreamSource subscribe) is
the residual.
This commit is contained in:
Joseph Doherty
2026-05-26 09:43:06 -04:00
parent f427dc4f26
commit 7fa863f6da
6 changed files with 317 additions and 8 deletions

View File

@@ -41,6 +41,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
private readonly IActorRef? _coordinatorOverride;
private readonly IDriverFactory _driverFactory;
private readonly IReadOnlySet<string> _localRoles;
private readonly IActorRef? _dependencyMux;
private readonly ILoggingAdapter _log = Context.GetLogger();
private RevisionHash? _currentRevision;
@@ -63,21 +64,25 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
CommonsNodeId localNode,
IActorRef? coordinator = null,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator, driverFactory, localRoles));
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux));
public DriverHostActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null)
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null)
{
_dbFactory = dbFactory;
_localNode = localNode;
_coordinatorOverride = coordinator;
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
_dependencyMux = dependencyMux;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
@@ -150,6 +155,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
{
Receive<DispatchDeployment>(HandleDispatchFromSteady);
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -168,9 +174,18 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Self.Forward(msg); // re-deliver after we transition back
});
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
{
// Pass driver-published values to the dependency mux when one is wired. Without a mux,
// VirtualTagActor evaluation can't fire — values just drop here. That's the dev/Mac path
// (no virtual tags registered); production binds the mux via the RuntimeActors extension.
_dependencyMux?.Tell(msg);
}
private void Stale()
{
Receive<DispatchDeployment>(_ =>

View File

@@ -10,6 +10,7 @@ using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime;
@@ -20,6 +21,7 @@ public static class ServiceCollectionExtensions
public const string DriverHostActorName = "driver-host";
public const string DbHealthProbeActorName = "db-health";
public const string HistorianAdapterActorName = "historian-adapter";
public const string DependencyMuxActorName = "dependency-mux";
/// <summary>
/// Registers shared runtime services. Currently binds <see cref="IAlarmHistorianSink"/>
@@ -65,9 +67,15 @@ public static class ServiceCollectionExtensions
DbHealthProbeActorName);
registry.Register<DbHealthProbeActorKey>(dbHealth);
// Dependency mux must be spawned before DriverHostActor so the host can forward
// AttributeValuePublished into it from the very first driver spawn.
var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName);
registry.Register<DependencyMuxActorKey>(mux);
var driverHost = system.ActorOf(
DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null,
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles),
driverFactory: driverFactory, localRoles: roleInfo.LocalRoles,
dependencyMux: mux),
DriverHostActorName);
registry.Register<DriverHostActorKey>(driverHost);
@@ -85,3 +93,4 @@ public static class ServiceCollectionExtensions
public sealed class DriverHostActorKey { }
public sealed class DbHealthProbeActorKey { }
public sealed class HistorianAdapterActorKey { }
public sealed class DependencyMuxActorKey { }

View File

@@ -0,0 +1,107 @@
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
/// <summary>
/// Per-node fan-out router from <see cref="DriverInstanceActor.AttributeValuePublished"/>
/// to interested <see cref="VirtualTagActor"/> instances. VirtualTagActor sends
/// <see cref="RegisterInterest"/> on start-up listing the tag refs it depends on; the mux
/// keeps a map of <c>tagRef → Set&lt;IActorRef&gt;</c> and on every AttributeValuePublished
/// forwards a <see cref="VirtualTagActor.DependencyValueChanged"/> to each interested
/// subscriber.
///
/// DriverHostActor forwards every <c>AttributeValuePublished</c> it receives from its
/// DriverInstanceActor children to this mux (one mux per driver-role node). The mux is
/// deliberately not a DPS subscriber — virtual-tag evaluation is local to each node and
/// would over-emit if it spanned the cluster.
/// </summary>
public sealed class DependencyMuxActor : ReceiveActor
{
public const string ActorName = "dependency-mux";
/// <summary>Register a subscriber's interest in a set of tag refs. Idempotent on re-register —
/// the second call replaces the prior interest set for that subscriber.</summary>
public sealed record RegisterInterest(IReadOnlyList<string> TagRefs, IActorRef Subscriber);
/// <summary>Unregister every interest held by <see cref="Subscriber"/>. Sent on PostStop by
/// the subscriber, or by Terminated handling when the mux watches.</summary>
public sealed record UnregisterInterest(IActorRef Subscriber);
private readonly Dictionary<string, HashSet<IActorRef>> _byRef = new(StringComparer.Ordinal);
private readonly Dictionary<IActorRef, HashSet<string>> _bySubscriber = new();
private readonly ILoggingAdapter _log = Context.GetLogger();
public static Props Props() => Akka.Actor.Props.Create<DependencyMuxActor>();
public DependencyMuxActor()
{
Receive<RegisterInterest>(OnRegister);
Receive<UnregisterInterest>(msg => RemoveSubscriber(msg.Subscriber));
Receive<DriverInstanceActor.AttributeValuePublished>(OnAttributeValuePublished);
Receive<Terminated>(msg => RemoveSubscriber(msg.ActorRef));
}
private void OnRegister(RegisterInterest msg)
{
// Replace any prior interest set so re-registers on actor restart don't leak old refs.
if (_bySubscriber.TryGetValue(msg.Subscriber, out var priorRefs))
{
foreach (var r in priorRefs)
{
if (_byRef.TryGetValue(r, out var set))
{
set.Remove(msg.Subscriber);
if (set.Count == 0) _byRef.Remove(r);
}
}
}
var refs = new HashSet<string>(msg.TagRefs, StringComparer.Ordinal);
_bySubscriber[msg.Subscriber] = refs;
foreach (var r in refs)
{
if (!_byRef.TryGetValue(r, out var set))
{
set = new HashSet<IActorRef>();
_byRef[r] = set;
}
set.Add(msg.Subscriber);
}
Context.Watch(msg.Subscriber);
_log.Debug("DependencyMux: subscriber {Sub} registered for {Count} refs", msg.Subscriber, refs.Count);
}
private void RemoveSubscriber(IActorRef subscriber)
{
if (!_bySubscriber.TryGetValue(subscriber, out var refs)) return;
foreach (var r in refs)
{
if (_byRef.TryGetValue(r, out var set))
{
set.Remove(subscriber);
if (set.Count == 0) _byRef.Remove(r);
}
}
_bySubscriber.Remove(subscriber);
Context.Unwatch(subscriber);
_log.Debug("DependencyMux: subscriber {Sub} removed", subscriber);
}
private void OnAttributeValuePublished(DriverInstanceActor.AttributeValuePublished msg)
{
if (!_byRef.TryGetValue(msg.FullReference, out var subscribers) || subscribers.Count == 0)
{
// No virtual tag cares about this ref — drop. Common in normal operation; the address
// space carries thousands of tags and only a fraction feed virtual-tag expressions.
return;
}
var dep = new VirtualTagActor.DependencyValueChanged(msg.FullReference, msg.Value, msg.TimestampUtc);
foreach (var sub in subscribers)
{
sub.Tell(dep);
}
}
}

View File

@@ -26,6 +26,8 @@ public sealed class VirtualTagActor : ReceiveActor
private readonly string _expression;
private readonly IVirtualTagEvaluator _evaluator;
private readonly Func<DPSPublisher>? _publisherFactory;
private readonly IReadOnlyList<string> _dependencyRefs;
private readonly IActorRef? _mux;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary<string, object?> _dependencies = new(StringComparer.Ordinal);
@@ -37,29 +39,50 @@ public sealed class VirtualTagActor : ReceiveActor
string expression,
IVirtualTagEvaluator? evaluator = null,
string? scriptId = null,
Func<DPSPublisher>? publisherFactory = null) =>
Func<DPSPublisher>? publisherFactory = null,
IReadOnlyList<string>? dependencyRefs = null,
IActorRef? mux = null) =>
Akka.Actor.Props.Create(() => new VirtualTagActor(
virtualTagId, expression,
evaluator ?? NullVirtualTagEvaluator.Instance,
scriptId ?? virtualTagId,
publisherFactory));
publisherFactory,
dependencyRefs ?? Array.Empty<string>(),
mux));
public VirtualTagActor(
string virtualTagId,
string expression,
IVirtualTagEvaluator evaluator,
string scriptId,
Func<DPSPublisher>? publisherFactory)
Func<DPSPublisher>? publisherFactory,
IReadOnlyList<string> dependencyRefs,
IActorRef? mux)
{
_virtualTagId = virtualTagId;
_scriptId = scriptId;
_expression = expression;
_evaluator = evaluator;
_publisherFactory = publisherFactory;
_dependencyRefs = dependencyRefs;
_mux = mux;
Receive<DependencyValueChanged>(OnDependencyChanged);
}
protected override void PreStart()
{
if (_mux is not null && _dependencyRefs.Count > 0)
{
_mux.Tell(new DependencyMuxActor.RegisterInterest(_dependencyRefs, Self));
}
}
protected override void PostStop()
{
_mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self));
}
private void OnDependencyChanged(DependencyValueChanged msg)
{
_dependencies[msg.TagId] = msg.Value;