fix(runtime): VirtualTagHost watches children + respawns after unexpected death
Context.Watch each spawned child; OnChildTerminated evicts it from _children so the next ApplyVirtualTags (still containing that vtagId) falls through the ContainsKey guard and re-spawns a fresh VirtualTagActor. Adds a spawn-site Debug log, moves the TODO about in-place plan mutation to the skip-existing branch where it belongs, and adds a deterministic TestKit test (Child_is_respawned_after_unexpected_termination) that kills the first child, drains its UnregisterInterest from the mux probe, re-applies, and asserts a second distinct RegisterInterest arrives.
This commit is contained in:
@@ -64,6 +64,7 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
|||||||
|
|
||||||
Receive<ApplyVirtualTags>(OnApply);
|
Receive<ApplyVirtualTags>(OnApply);
|
||||||
Receive<VirtualTagActor.EvaluationResult>(OnResult);
|
Receive<VirtualTagActor.EvaluationResult>(OnResult);
|
||||||
|
Receive<Terminated>(OnChildTerminated);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void OnApply(ApplyVirtualTags msg)
|
private void OnApply(ApplyVirtualTags msg)
|
||||||
@@ -93,6 +94,9 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
|||||||
// a child whose plan changed (the diff already identifies ChangedEquipmentVirtualTags).
|
// a child whose plan changed (the diff already identifies ChangedEquipmentVirtualTags).
|
||||||
foreach (var p in msg.Plans)
|
foreach (var p in msg.Plans)
|
||||||
{
|
{
|
||||||
|
// TODO(equipment-virtualtags): when a plan's Expression/DependencyRefs change in place
|
||||||
|
// (ChangedEquipmentVirtualTags), stop+respawn the child here; today only spawn-new/stop-removed
|
||||||
|
// is handled (loader vtags are stable).
|
||||||
if (_children.ContainsKey(p.VirtualTagId)) continue;
|
if (_children.ContainsKey(p.VirtualTagId)) continue;
|
||||||
|
|
||||||
// Auto-name the child: vtagIds can contain characters illegal in actor names, so let Akka
|
// Auto-name the child: vtagIds can contain characters illegal in actor names, so let Akka
|
||||||
@@ -105,7 +109,9 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
|||||||
publisherFactory: null,
|
publisherFactory: null,
|
||||||
dependencyRefs: p.DependencyRefs,
|
dependencyRefs: p.DependencyRefs,
|
||||||
mux: _mux));
|
mux: _mux));
|
||||||
|
Context.Watch(child);
|
||||||
_children[p.VirtualTagId] = child;
|
_children[p.VirtualTagId] = child;
|
||||||
|
_log.Debug("VirtualTagHost: spawned child for vtag {VirtualTagId}", p.VirtualTagId);
|
||||||
}
|
}
|
||||||
|
|
||||||
_log.Debug("VirtualTagHost: applied (desired={Desired}, children={Children})",
|
_log.Debug("VirtualTagHost: applied (desired={Desired}, children={Children})",
|
||||||
@@ -125,6 +131,18 @@ public sealed class VirtualTagHostActor : ReceiveActor
|
|||||||
nodeId, result.Value, OpcUaQuality.Good, result.TimestampUtc));
|
nodeId, result.Value, OpcUaQuality.Good, result.TimestampUtc));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void OnChildTerminated(Terminated msg)
|
||||||
|
{
|
||||||
|
var stale = _children.Where(kv => kv.Value.Equals(msg.ActorRef)).Select(kv => kv.Key).ToList();
|
||||||
|
foreach (var id in stale)
|
||||||
|
{
|
||||||
|
_children.Remove(id);
|
||||||
|
// NodeId map is rebuilt on the next ApplyVirtualTags; leaving the mapping is harmless
|
||||||
|
// (no child will publish for it until respawned). A dead child is respawned on next apply.
|
||||||
|
_log.Warning("VirtualTagHost: child for vtag {VirtualTagId} terminated; will respawn on next apply", id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Folder-scoped NodeId for a VirtualTag plan — MUST match
|
/// <summary>Folder-scoped NodeId for a VirtualTag plan — MUST match
|
||||||
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> exactly, or the published value lands on a
|
/// <c>Phase7Applier.MaterialiseEquipmentVirtualTags</c> exactly, or the published value lands on a
|
||||||
/// NodeId that was never materialised.</summary>
|
/// NodeId that was never materialised.</summary>
|
||||||
|
|||||||
@@ -115,6 +115,44 @@ public sealed class VirtualTagHostActorTests : RuntimeActorTestBase
|
|||||||
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// After a child actor terminates unexpectedly, a subsequent ApplyVirtualTags (still containing
|
||||||
|
/// that vtag) must re-spawn it. Proof: two distinct RegisterInterest messages arrive at the mux
|
||||||
|
/// probe — one for the original child and one for the replacement.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public void Child_is_respawned_after_unexpected_termination()
|
||||||
|
{
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var host = Sys.ActorOf(VirtualTagHostActor.Props(publish.Ref, mux.Ref, new StubEvaluator()));
|
||||||
|
var plan = new[] { Plan("vt-1", "eq-1", "speed-rpm") };
|
||||||
|
|
||||||
|
// First apply — child self-registers; capture the child ref from the message sender.
|
||||||
|
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(plan));
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
|
||||||
|
var firstChild = mux.LastSender;
|
||||||
|
|
||||||
|
// Watch the child from the test side so we can await its death deterministically before
|
||||||
|
// re-applying, avoiding any race between Terminated delivery to the host and the re-apply.
|
||||||
|
Watch(firstChild);
|
||||||
|
Sys.Stop(firstChild);
|
||||||
|
ExpectTerminated(firstChild);
|
||||||
|
|
||||||
|
// The dying child's PostStop sends UnregisterInterest to the mux — drain it so the mux probe
|
||||||
|
// mailbox is clean before we look for the new RegisterInterest.
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.UnregisterInterest>(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Re-apply with the same plan — host should see vt-1 absent from _children and spawn fresh.
|
||||||
|
host.Tell(new VirtualTagHostActor.ApplyVirtualTags(plan));
|
||||||
|
var reg2 = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(TimeSpan.FromSeconds(5));
|
||||||
|
reg2.TagRefs.ShouldContain("a");
|
||||||
|
|
||||||
|
// The new child must be a different actor ref than the one we killed.
|
||||||
|
var secondChild = mux.LastSender;
|
||||||
|
secondChild.ShouldNotBe(firstChild);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Deterministic no-op evaluator: keeps spawned children inert so tests drive the host's
|
/// <summary>Deterministic no-op evaluator: keeps spawned children inert so tests drive the host's
|
||||||
/// OnResult path directly via synthetic EvaluationResults.</summary>
|
/// OnResult path directly via synthetic EvaluationResults.</summary>
|
||||||
private sealed class StubEvaluator : IVirtualTagEvaluator
|
private sealed class StubEvaluator : IVirtualTagEvaluator
|
||||||
|
|||||||
Reference in New Issue
Block a user