feat(controlplane): ConfigPublishCoordinator deadline timeout + failover PreStart recovery
This commit is contained in:
@@ -21,25 +21,78 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators;
|
|||||||
/// Discovery of the "expected ACK set" comes from <c>Akka.Cluster.State.Members</c> filtered by
|
/// Discovery of the "expected ACK set" comes from <c>Akka.Cluster.State.Members</c> filtered by
|
||||||
/// the <c>driver</c> role — the DB does not own per-node role assignment.
|
/// the <c>driver</c> role — the DB does not own per-node role assignment.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class ConfigPublishCoordinator : ReceiveActor
|
public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers
|
||||||
{
|
{
|
||||||
public const string DeploymentsTopic = "deployments";
|
public const string DeploymentsTopic = "deployments";
|
||||||
|
public static readonly TimeSpan DefaultApplyDeadline = TimeSpan.FromMinutes(2);
|
||||||
|
|
||||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||||
|
private readonly TimeSpan _applyDeadline;
|
||||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
private readonly Dictionary<NodeId, ApplyAckOutcome> _acks = new();
|
private readonly Dictionary<NodeId, ApplyAckOutcome> _acks = new();
|
||||||
|
|
||||||
private DeploymentId? _current;
|
private DeploymentId? _current;
|
||||||
private HashSet<NodeId> _expectedAcks = new();
|
private HashSet<NodeId> _expectedAcks = new();
|
||||||
|
|
||||||
public static Props Props(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory) =>
|
public ITimerScheduler Timers { get; set; } = null!;
|
||||||
Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory));
|
|
||||||
|
|
||||||
public ConfigPublishCoordinator(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory)
|
public static Props Props(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||||
|
TimeSpan? applyDeadline = null) =>
|
||||||
|
Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory, applyDeadline ?? DefaultApplyDeadline));
|
||||||
|
|
||||||
|
public ConfigPublishCoordinator(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||||
|
TimeSpan applyDeadline)
|
||||||
{
|
{
|
||||||
_dbFactory = dbFactory;
|
_dbFactory = dbFactory;
|
||||||
|
_applyDeadline = applyDeadline;
|
||||||
Receive<DispatchDeployment>(HandleDispatch);
|
Receive<DispatchDeployment>(HandleDispatch);
|
||||||
Receive<ApplyAck>(HandleAck);
|
Receive<ApplyAck>(HandleAck);
|
||||||
|
Receive<DeadlineElapsed>(HandleDeadline);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// On startup recover any deployment that was mid-flight when a prior singleton instance
|
||||||
|
/// died. We re-derive <c>_expectedAcks</c> from <c>NodeDeploymentState</c>, replay the ACKs
|
||||||
|
/// that already landed in the DB, and resume the deadline timer.
|
||||||
|
/// </summary>
|
||||||
|
protected override void PreStart()
|
||||||
|
{
|
||||||
|
using var db = _dbFactory.CreateDbContext();
|
||||||
|
var inflight = db.Deployments
|
||||||
|
.Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks)
|
||||||
|
.OrderByDescending(d => d.CreatedAtUtc)
|
||||||
|
.FirstOrDefault();
|
||||||
|
if (inflight is null) return;
|
||||||
|
|
||||||
|
_current = new DeploymentId(inflight.DeploymentId);
|
||||||
|
var nodeStates = db.NodeDeploymentStates
|
||||||
|
.Where(x => x.DeploymentId == inflight.DeploymentId)
|
||||||
|
.AsNoTracking()
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
_expectedAcks = nodeStates.Select(s => NodeId.Parse(s.NodeId)).ToHashSet();
|
||||||
|
foreach (var s in nodeStates.Where(s => s.Status != NodeDeploymentStatus.Applying))
|
||||||
|
_acks[NodeId.Parse(s.NodeId)] = s.Status == NodeDeploymentStatus.Applied
|
||||||
|
? ApplyAckOutcome.Applied
|
||||||
|
: ApplyAckOutcome.Failed;
|
||||||
|
|
||||||
|
// Resume the deadline timer using the remaining time. The deadline runs from when the
|
||||||
|
// deployment was first marked AwaitingApplyAcks (Deployment.CreatedAtUtc is a close enough
|
||||||
|
// proxy — we don't track a separate "dispatched at" column).
|
||||||
|
var elapsed = DateTime.UtcNow - inflight.CreatedAtUtc;
|
||||||
|
var remaining = _applyDeadline - elapsed;
|
||||||
|
if (remaining <= TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
Self.Tell(new DeadlineElapsed(_current.Value));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Timers.StartSingleTimer(DeadlineTimerKey, new DeadlineElapsed(_current.Value), remaining);
|
||||||
|
}
|
||||||
|
_log.Info("Coordinator recovered in-flight deployment {Id} ({Acked}/{Total} acks landed)",
|
||||||
|
_current, _acks.Count, _expectedAcks.Count);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleDispatch(DispatchDeployment msg)
|
private void HandleDispatch(DispatchDeployment msg)
|
||||||
@@ -65,6 +118,7 @@ public sealed class ConfigPublishCoordinator : ReceiveActor
|
|||||||
}
|
}
|
||||||
|
|
||||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, msg));
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, msg));
|
||||||
|
Timers.StartSingleTimer(DeadlineTimerKey, new DeadlineElapsed(msg.DeploymentId), _applyDeadline);
|
||||||
|
|
||||||
if (_expectedAcks.Count == 0)
|
if (_expectedAcks.Count == 0)
|
||||||
{
|
{
|
||||||
@@ -132,13 +186,40 @@ public sealed class ConfigPublishCoordinator : ReceiveActor
|
|||||||
ResetForNext();
|
ResetForNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void HandleDeadline(DeadlineElapsed msg)
|
||||||
|
{
|
||||||
|
if (_current is null || msg.DeploymentId != _current.Value)
|
||||||
|
{
|
||||||
|
_log.Debug("Discarding stale DeadlineElapsed for {Id} (current={Current})",
|
||||||
|
msg.DeploymentId, _current);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_acks.Count == _expectedAcks.Count)
|
||||||
|
{
|
||||||
|
// Race: every node acked just as the deadline fired. Already sealed/failed elsewhere.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
using var db = _dbFactory.CreateDbContext();
|
||||||
|
UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.TimedOut);
|
||||||
|
db.SaveChanges();
|
||||||
|
_log.Warning("Deployment {Id} timed out after {Deadline} ({Acked}/{Total} acks landed)",
|
||||||
|
_current.Value, _applyDeadline, _acks.Count, _expectedAcks.Count);
|
||||||
|
ResetForNext();
|
||||||
|
}
|
||||||
|
|
||||||
private void ResetForNext()
|
private void ResetForNext()
|
||||||
{
|
{
|
||||||
|
Timers.Cancel(DeadlineTimerKey);
|
||||||
_current = null;
|
_current = null;
|
||||||
_expectedAcks.Clear();
|
_expectedAcks.Clear();
|
||||||
_acks.Clear();
|
_acks.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private const string DeadlineTimerKey = "apply-deadline";
|
||||||
|
|
||||||
|
public sealed record DeadlineElapsed(DeploymentId DeploymentId);
|
||||||
|
|
||||||
private static void UpdateDeploymentStatus(
|
private static void UpdateDeploymentStatus(
|
||||||
OtOpcUaConfigDbContext db, DeploymentId id, DeploymentStatus status, bool sealNow = false)
|
OtOpcUaConfigDbContext db, DeploymentId id, DeploymentStatus status, bool sealNow = false)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
||||||
|
|
||||||
|
public sealed class ConfigPublishCoordinatorTimeoutTests : ControlPlaneActorTestBase
|
||||||
|
{
|
||||||
|
private static readonly RevisionHash TestRevision = RevisionHash.Parse(new string('b', 64));
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DeadlineElapsed_for_current_deployment_marks_TimedOut()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var deploymentId = SeedDispatchingDeployment(dbFactory);
|
||||||
|
|
||||||
|
// Short deadline so we can drive it deterministically in the test.
|
||||||
|
var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMilliseconds(150)));
|
||||||
|
|
||||||
|
// Seed a NodeDeploymentState row so the coordinator doesn't see "zero expected acks"
|
||||||
|
// and short-circuit to Sealed. We pretend a driver node exists in the cluster.
|
||||||
|
using (var db = dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState
|
||||||
|
{
|
||||||
|
NodeId = "phantom-driver",
|
||||||
|
DeploymentId = deploymentId.Value,
|
||||||
|
Status = NodeDeploymentStatus.Applying,
|
||||||
|
});
|
||||||
|
db.SaveChanges();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drive the deadline ourselves rather than waiting for the cluster's empty driver-set
|
||||||
|
// bypass. Tell the actor the deadline elapsed for this id.
|
||||||
|
actor.Tell(new ConfigPublishCoordinator.DeadlineElapsed(deploymentId));
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.Deployments.Single().Status.ShouldBe(DeploymentStatus.TimedOut);
|
||||||
|
}, duration: TimeSpan.FromSeconds(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Stale_DeadlineElapsed_for_other_deployment_is_ignored()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var deploymentId = SeedDispatchingDeployment(dbFactory);
|
||||||
|
var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMinutes(1)));
|
||||||
|
|
||||||
|
// Tell the actor a deadline elapsed for a completely different deployment id.
|
||||||
|
actor.Tell(new ConfigPublishCoordinator.DeadlineElapsed(DeploymentId.NewId()));
|
||||||
|
|
||||||
|
// The seeded one should remain in its starting state (no transition triggered).
|
||||||
|
ExpectNoMsg(TimeSpan.FromMilliseconds(250));
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
var status = db.Deployments.Single().Status;
|
||||||
|
status.ShouldBeOneOf(DeploymentStatus.Dispatching, DeploymentStatus.AwaitingApplyAcks, DeploymentStatus.Sealed);
|
||||||
|
status.ShouldNotBe(DeploymentStatus.TimedOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PreStart_recovers_inflight_deployment_state()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var deploymentId = SeedDispatchingDeployment(dbFactory, status: DeploymentStatus.AwaitingApplyAcks);
|
||||||
|
|
||||||
|
// Seed two NodeDeploymentState rows — one already Applied, one still Applying.
|
||||||
|
using (var db = dbFactory.CreateDbContext())
|
||||||
|
{
|
||||||
|
db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState
|
||||||
|
{
|
||||||
|
NodeId = "driver-a", DeploymentId = deploymentId.Value, Status = NodeDeploymentStatus.Applied,
|
||||||
|
});
|
||||||
|
db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState
|
||||||
|
{
|
||||||
|
NodeId = "driver-b", DeploymentId = deploymentId.Value, Status = NodeDeploymentStatus.Applying,
|
||||||
|
});
|
||||||
|
db.SaveChanges();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a fresh coordinator — simulates singleton failover to this node.
|
||||||
|
var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMinutes(5)));
|
||||||
|
|
||||||
|
// Send the missing ACK; the recovered state should expect exactly that node, and the
|
||||||
|
// deployment should now seal (both nodes acked Applied).
|
||||||
|
actor.Tell(new ApplyAck(deploymentId, NodeId.Parse("driver-b"),
|
||||||
|
ApplyAckOutcome.Applied, null, CorrelationId.NewId()));
|
||||||
|
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.Deployments.Single().Status.ShouldBe(DeploymentStatus.Sealed);
|
||||||
|
}, duration: TimeSpan.FromSeconds(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DeploymentId SeedDispatchingDeployment(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||||
|
DeploymentStatus status = DeploymentStatus.Dispatching)
|
||||||
|
{
|
||||||
|
var id = DeploymentId.NewId();
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.Deployments.Add(new Configuration.Entities.Deployment
|
||||||
|
{
|
||||||
|
DeploymentId = id.Value,
|
||||||
|
RevisionHash = TestRevision.Value,
|
||||||
|
Status = status,
|
||||||
|
CreatedBy = "test",
|
||||||
|
});
|
||||||
|
db.SaveChanges();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user