From ed130135cae8c9a7b94e07494327be442fea2828 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 05:02:42 -0400 Subject: [PATCH] feat(runtime): DriverHostActor state machine with PreStart recovery + DispatchDeployment + stale fallback --- ZB.MOM.WW.OtOpcUa.slnx | 1 + .../Drivers/DriverHostActor.cs | 284 ++++++++++++++++++ .../Drivers/DriverHostActorTests.cs | 143 +++++++++ .../Harness/RuntimeActorTestBase.cs | 59 ++++ .../ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj | 34 +++ 5 files changed, 521 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorTests.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Harness/RuntimeActorTestBase.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx index 42b71f9..dc88700 100644 --- a/ZB.MOM.WW.OtOpcUa.slnx +++ b/ZB.MOM.WW.OtOpcUa.slnx @@ -64,6 +64,7 @@ + diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs new file mode 100644 index 0000000..6908d16 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -0,0 +1,284 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; + +/// +/// Per-node supervisor that receives from the admin-role +/// coordinator and applies the deployment locally. Three Become states: +/// +/// +/// Bootstrapping — PreStart only. Reads for self; +/// chooses next state. +/// Steady(rev) — caught up. Idempotent on same-rev dispatch (immediate ApplyAck.Applied). +/// New rev → transitions to Applying. +/// Applying(id) — applying a delta. Buffers further dispatches. +/// Stale — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance. +/// +/// +/// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44). +/// For now the dispatch handler treats the apply as a no-op and writes the ACK back. +/// +public sealed class DriverHostActor : ReceiveActor, IWithTimers +{ + public const string DeploymentsTopic = "deployments"; + public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); + + private readonly IDbContextFactory _dbFactory; + private readonly CommonsNodeId _localNode; + private readonly IActorRef? _coordinatorOverride; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + private RevisionHash? _currentRevision; + private DeploymentId? _applyingDeploymentId; + + public ITimerScheduler Timers { get; set; } = null!; + + public sealed class RetryConfigDbConnection + { + public static readonly RetryConfigDbConnection Instance = new(); + private RetryConfigDbConnection() { } + } + + public static Props Props( + IDbContextFactory dbFactory, + CommonsNodeId localNode, + IActorRef? coordinator = null) => + Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator)); + + public DriverHostActor( + IDbContextFactory dbFactory, + CommonsNodeId localNode, + IActorRef? coordinator) + { + _dbFactory = dbFactory; + _localNode = localNode; + _coordinatorOverride = coordinator; + + // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. + Become(Steady); + } + + protected override void PreStart() + { + // Subscribe to deployments topic so the coordinator's broadcast lands here. + DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self)); + Bootstrap(); + } + + private void Bootstrap() + { + // Read the most-recent NodeDeploymentState for this node; if it's Applied, jump + // to Steady with that revision. If Applying (orphan from a crash), discard and replay. + // If the DB is unreachable, fall back to Stale and start the reconnect loop. + try + { + using var db = _dbFactory.CreateDbContext(); + var latest = db.NodeDeploymentStates + .Where(s => s.NodeId == _localNode.Value) + .OrderByDescending(s => s.StartedAtUtc) + .Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc }) + .FirstOrDefault(); + + if (latest is null) + { + _log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode); + Become(Steady); + return; + } + + var deployment = db.Deployments + .AsNoTracking() + .FirstOrDefault(d => d.DeploymentId == latest.DeploymentId); + var revision = deployment is null + ? (RevisionHash?)null + : RevisionHash.Parse(deployment.RevisionHash); + + switch (latest.Status) + { + case NodeDeploymentStatus.Applied: + _currentRevision = revision; + _log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision); + Become(Steady); + break; + case NodeDeploymentStatus.Applying: + _log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying", + _localNode, latest.DeploymentId); + if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId()); + else Become(Steady); + break; + case NodeDeploymentStatus.Failed: + default: + _log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev", + _localNode, latest.DeploymentId); + Become(Steady); + break; + } + } + catch (Exception ex) + { + _log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode); + Become(Stale); + } + } + + private void Steady() + { + Receive(HandleDispatchFromSteady); + Receive(_ => { /* PubSub ack */ }); + } + + private void Applying() + { + Receive(msg => + { + if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value) + { + _log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring", + _localNode, msg.DeploymentId); + return; + } + _log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring", + _localNode, msg.DeploymentId, _applyingDeploymentId); + Self.Forward(msg); // re-deliver after we transition back + }); + Receive(_ => { /* PubSub ack */ }); + } + + private void Stale() + { + Receive(_ => + { + _log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode); + }); + Receive(_ => TryRecoverFromStale()); + Receive(_ => { /* PubSub ack */ }); + Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval); + } + + private void HandleDispatchFromSteady(DispatchDeployment msg) + { + if (_currentRevision is { } cur && cur == msg.RevisionHash) + { + // Idempotent — already at this rev. Ack and stay Steady. + _log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK", + _localNode, msg.DeploymentId, msg.RevisionHash); + SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId); + return; + } + ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId); + } + + private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation) + { + _applyingDeploymentId = deploymentId; + Become(Applying); + + // Persist Applying row (idempotent on PK). + UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null); + + try + { + // Future: dispatch ApplyDelta to children, wait for acks. For Task 37/38, just no-op. + _currentRevision = revision; + UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null); + SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation); + _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev})", _localNode, deploymentId, revision); + } + catch (Exception ex) + { + UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message); + SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation); + _log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId); + } + finally + { + _applyingDeploymentId = null; + Become(Steady); + } + } + + private void TryRecoverFromStale() + { + try + { + using var db = _dbFactory.CreateDbContext(); + var latestSealed = db.Deployments + .AsNoTracking() + .Where(d => d.Status == DeploymentStatus.Sealed) + .OrderByDescending(d => d.SealedAtUtc) + .Select(d => new { d.DeploymentId, d.RevisionHash }) + .FirstOrDefault(); + + _log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode); + Timers.Cancel("retry-db"); + + if (latestSealed is not null) + { + _currentRevision = RevisionHash.Parse(latestSealed.RevisionHash); + UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId), + NodeDeploymentStatus.Applied, failureReason: null); + } + Become(Steady); + } + catch (Exception ex) + { + _log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval); + } + } + + private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason) + { + try + { + using var db = _dbFactory.CreateDbContext(); + var existing = db.NodeDeploymentStates.FirstOrDefault( + x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value); + if (existing is null) + { + db.NodeDeploymentStates.Add(new NodeDeploymentState + { + NodeId = _localNode.Value, + DeploymentId = deploymentId.Value, + Status = status, + FailureReason = failureReason, + AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null, + }); + } + else + { + existing.Status = status; + existing.FailureReason = failureReason; + if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow; + } + db.SaveChanges(); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId); + } + } + + private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation) + { + var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation); + if (_coordinatorOverride is not null) + { + _coordinatorOverride.Tell(ack); + } + else + { + // No direct coordinator handle — publish back through DistributedPubSub so the + // singleton routes it. The coordinator subscribes to its own incoming topic. + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, ack)); + } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorTests.cs new file mode 100644 index 0000000..0d2c9bb --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorTests.cs @@ -0,0 +1,143 @@ +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +public sealed class DriverHostActorTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("driver-test"); + private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64)); + private static readonly RevisionHash RevB = RevisionHash.Parse(new string('b', 64)); + + [Fact] + public void Bootstrap_with_no_prior_state_enters_Steady() + { + var db = NewInMemoryDbFactory(); + var coordinator = CreateTestProbe(); + var actor = Sys.ActorOf(DriverHostActor.Props(db, TestNode, coordinator.Ref)); + + // No-rev Steady: an incoming dispatch should be processed as a fresh apply, not a no-op. + var deploymentId = SeedDeployment(db, RevA, DeploymentStatus.Sealed); + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + + var ack = coordinator.ExpectMsg(TimeSpan.FromSeconds(5)); + ack.Outcome.ShouldBe(ApplyAckOutcome.Applied); + ack.NodeId.ShouldBe(TestNode); + } + + [Fact] + public void Same_revision_dispatch_is_acked_immediately_with_no_apply_work() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeployment(db, RevA, DeploymentStatus.Sealed); + + // Seed an Applied NodeDeploymentState for self at RevA so PreStart recovers Steady@RevA. + using (var ctx = db.CreateDbContext()) + { + ctx.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState + { + NodeId = TestNode.Value, + DeploymentId = deploymentId.Value, + Status = NodeDeploymentStatus.Applied, + AppliedAtUtc = DateTime.UtcNow.AddMinutes(-1), + }); + ctx.SaveChanges(); + } + + var coordinator = CreateTestProbe(); + var actor = Sys.ActorOf(DriverHostActor.Props(db, TestNode, coordinator.Ref)); + + // Dispatch the SAME deployment again. + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + + var ack = coordinator.ExpectMsg(TimeSpan.FromSeconds(5)); + ack.Outcome.ShouldBe(ApplyAckOutcome.Applied); + + // No new NodeDeploymentState row got added — the rev matched, so nothing changed. + using var verify = db.CreateDbContext(); + verify.NodeDeploymentStates.Count(s => s.NodeId == TestNode.Value).ShouldBe(1); + } + + [Fact] + public void New_revision_dispatch_writes_Applied_NodeDeploymentState() + { + var db = NewInMemoryDbFactory(); + var deploymentB = SeedDeployment(db, RevB, DeploymentStatus.Dispatching); + + var coordinator = CreateTestProbe(); + var actor = Sys.ActorOf(DriverHostActor.Props(db, TestNode, coordinator.Ref)); + + actor.Tell(new DispatchDeployment(deploymentB, RevB, CorrelationId.NewId())); + + coordinator.ExpectMsg(TimeSpan.FromSeconds(5)).Outcome.ShouldBe(ApplyAckOutcome.Applied); + + AwaitAssert(() => + { + using var verify = db.CreateDbContext(); + var row = verify.NodeDeploymentStates.Single(s => + s.NodeId == TestNode.Value && s.DeploymentId == deploymentB.Value); + row.Status.ShouldBe(NodeDeploymentStatus.Applied); + row.AppliedAtUtc.ShouldNotBeNull(); + }, duration: TimeSpan.FromSeconds(3)); + } + + [Fact] + public void Orphan_Applying_row_on_bootstrap_replays_apply() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeployment(db, RevA, DeploymentStatus.AwaitingApplyAcks); + + // Crash-orphan: a prior actor was mid-apply and never finished. + using (var ctx = db.CreateDbContext()) + { + ctx.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState + { + NodeId = TestNode.Value, + DeploymentId = deploymentId.Value, + Status = NodeDeploymentStatus.Applying, + StartedAtUtc = DateTime.UtcNow.AddMinutes(-2), + }); + ctx.SaveChanges(); + } + + var coordinator = CreateTestProbe(); + Sys.ActorOf(DriverHostActor.Props(db, TestNode, coordinator.Ref)); + + // PreStart should replay → ApplyAck back to coordinator with the new correlation id. + var ack = coordinator.ExpectMsg(TimeSpan.FromSeconds(5)); + ack.DeploymentId.ShouldBe(deploymentId); + ack.Outcome.ShouldBe(ApplyAckOutcome.Applied); + + using var verify = db.CreateDbContext(); + verify.NodeDeploymentStates.Single(s => + s.NodeId == TestNode.Value && s.DeploymentId == deploymentId.Value) + .Status.ShouldBe(NodeDeploymentStatus.Applied); + } + + private static DeploymentId SeedDeployment( + IDbContextFactory db, + RevisionHash rev, + DeploymentStatus status) + { + var id = DeploymentId.NewId(); + using var ctx = db.CreateDbContext(); + ctx.Deployments.Add(new Configuration.Entities.Deployment + { + DeploymentId = id.Value, + RevisionHash = rev.Value, + Status = status, + CreatedBy = "test", + SealedAtUtc = status == DeploymentStatus.Sealed ? DateTime.UtcNow : null, + }); + ctx.SaveChanges(); + return id; + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Harness/RuntimeActorTestBase.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Harness/RuntimeActorTestBase.cs new file mode 100644 index 0000000..2eb25b2 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Harness/RuntimeActorTestBase.cs @@ -0,0 +1,59 @@ +using Akka.Cluster; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Configuration; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +/// +/// Mirrors the ControlPlane test harness: single-node Akka cluster (self-join) + in-memory +/// EF Core via an . +/// +public abstract class RuntimeActorTestBase : TestKit +{ + protected static string AkkaTestHocon => @" +akka { + loglevel = ""WARNING"" + extensions = [ + ""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools"" + ] + actor { + provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" + } + remote.dot-netty.tcp { + hostname = ""127.0.0.1"" + port = 0 + } + cluster { + seed-nodes = [] + roles = [""driver""] + min-nr-of-members = 1 + run-coordinated-shutdown-when-down = off + } +}"; + + protected RuntimeActorTestBase() : base(AkkaTestHocon) + { + var cluster = Akka.Cluster.Cluster.Get(Sys); + cluster.Join(cluster.SelfAddress); + AwaitCondition(() => cluster.State.Members.Any(m => m.Status == MemberStatus.Up), + TimeSpan.FromSeconds(5)); + } + + protected static IDbContextFactory NewInMemoryDbFactory(string? dbName = null) + { + dbName ??= Guid.NewGuid().ToString("N"); + return new InMemoryConfigDbFactory(dbName); + } + + private sealed class InMemoryConfigDbFactory(string dbName) : IDbContextFactory + { + public OtOpcUaConfigDbContext CreateDbContext() + { + var opts = new DbContextOptionsBuilder() + .UseInMemoryDatabase(dbName) + .Options; + return new OtOpcUaConfigDbContext(opts); + } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj new file mode 100644 index 0000000..3417620 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ZB.MOM.WW.OtOpcUa.Runtime.Tests.csproj @@ -0,0 +1,34 @@ + + + + false + true + ZB.MOM.WW.OtOpcUa.Runtime.Tests + true + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + +