fix(runtime): fast-fail RouteNodeWrite while Stale + micro-opts + raw-blob routing test
This commit is contained in:
@@ -92,10 +92,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
/// NodeId(s)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/> from the
|
/// NodeId(s)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/> from the
|
||||||
/// composition's <c>EquipmentTags</c> (mirroring <c>VirtualTagHostActor._nodeIdByVtag</c>), and
|
/// composition's <c>EquipmentTags</c> (mirroring <c>VirtualTagHostActor._nodeIdByVtag</c>), and
|
||||||
/// resolved in <see cref="ForwardToMux"/> so a driver value published by wire-ref FullName lands
|
/// resolved in <see cref="ForwardToMux"/> so a driver value published by wire-ref FullName lands
|
||||||
/// on the variable's actual folder-scoped NodeId. A list because the same driver ref can back
|
/// on the variable's actual folder-scoped NodeId. A set because the same driver ref can back
|
||||||
/// several equipment variables (e.g. identical machines sharing a register).
|
/// several equipment variables (e.g. identical machines sharing a register), and the per-apply
|
||||||
|
/// rebuild dedups by NodeId.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private readonly Dictionary<(string DriverInstanceId, string FullName), List<string>> _nodeIdByDriverRef = new();
|
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet<string>> _nodeIdByDriverRef = new();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Inverse of <see cref="_nodeIdByDriverRef"/>: <c>folder-scoped equipment NodeId →
|
/// Inverse of <see cref="_nodeIdByDriverRef"/>: <c>folder-scoped equipment NodeId →
|
||||||
@@ -516,7 +517,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
? new NodeWriteResult(t.Result.Success, t.Result.Reason)
|
? new NodeWriteResult(t.Result.Success, t.Result.Reason)
|
||||||
: new NodeWriteResult(false, "write timeout"),
|
: new NodeWriteResult(false, "write timeout"),
|
||||||
CancellationToken.None,
|
CancellationToken.None,
|
||||||
TaskContinuationOptions.ExecuteSynchronously,
|
TaskContinuationOptions.None,
|
||||||
TaskScheduler.Default)
|
TaskScheduler.Default)
|
||||||
.PipeTo(replyTo);
|
.PipeTo(replyTo);
|
||||||
}
|
}
|
||||||
@@ -545,6 +546,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
Receive<RestartDriver>(HandleRestartDriver);
|
Receive<RestartDriver>(HandleRestartDriver);
|
||||||
Receive<ReconnectDriver>(HandleReconnectDriver);
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
||||||
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||||
|
// An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the
|
||||||
|
// node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout.
|
||||||
|
Receive<RouteNodeWrite>(_ =>
|
||||||
|
Sender.Tell(new NodeWriteResult(false, "driver host stale (config DB unreachable)")));
|
||||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||||
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
||||||
}
|
}
|
||||||
@@ -751,9 +756,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
{
|
{
|
||||||
var key = (t.DriverInstanceId, t.FullName);
|
var key = (t.DriverInstanceId, t.FullName);
|
||||||
var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name);
|
var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name);
|
||||||
if (!_nodeIdByDriverRef.TryGetValue(key, out var list))
|
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
|
||||||
_nodeIdByDriverRef[key] = list = new List<string>();
|
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
|
||||||
if (!list.Contains(nodeId)) list.Add(nodeId);
|
set.Add(nodeId);
|
||||||
_driverRefByNodeId[nodeId] = key;
|
_driverRefByNodeId[nodeId] = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+130
@@ -125,6 +125,61 @@ public sealed class DriverHostActorWriteRoutingTests : RuntimeActorTestBase
|
|||||||
recorder.Writes.ShouldBeEmpty();
|
recorder.Writes.ShouldBeEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>The router keys purely on NodeId — the tag's TagConfig blob shape is irrelevant. A tag
|
||||||
|
/// seeded with a RAW protocol-driver config blob (Modbus-shaped, no <c>FullName</c> key) routes the
|
||||||
|
/// write to its owning child exactly like the Galaxy-style <c>{FullName}</c> blob does, because the
|
||||||
|
/// reverse map is built from the resolved <c>FullName</c> the composer projects, not the raw blob.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Primary_routes_write_for_raw_protocol_blob_tag()
|
||||||
|
{
|
||||||
|
var db = NewInMemoryDbFactory();
|
||||||
|
var recorder = new RecordingDriverFactory("Modbus");
|
||||||
|
// Seed the tag with a RAW protocol blob ({region/address/dataType}) instead of {FullName}; the
|
||||||
|
// composer still resolves a FullName, so the reverse map keys on that and the blob never matters.
|
||||||
|
var deploymentId = SeedDeploymentWithRawBlobTag(db, RevA,
|
||||||
|
equip: "eq-2", driver: "drv-2", fullName: "40002", name: "torque");
|
||||||
|
|
||||||
|
var actor = SpawnHostAndApply(db, deploymentId, recorder);
|
||||||
|
|
||||||
|
// Local role unknown ⇒ treated as Primary ⇒ write allowed.
|
||||||
|
var asker = CreateTestProbe();
|
||||||
|
actor.Tell(new DriverHostActor.RouteNodeWrite("eq-2/torque", 456.0), asker.Ref);
|
||||||
|
|
||||||
|
var result = asker.ExpectMsg<DriverHostActor.NodeWriteResult>(Timeout);
|
||||||
|
result.Success.ShouldBeTrue();
|
||||||
|
|
||||||
|
// The write was forwarded to the owning child keyed by the resolved FullName, not the blob.
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
recorder.Writes.Count.ShouldBe(1);
|
||||||
|
recorder.Writes[0].FullReference.ShouldBe("40002");
|
||||||
|
recorder.Writes[0].Value.ShouldBe(456.0);
|
||||||
|
}, duration: Timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A RouteNodeWrite arriving while the host is Stale (config DB unreachable) must fast-fail
|
||||||
|
/// with an immediate negative NodeWriteResult (reason mentions "stale") instead of dead-lettering into
|
||||||
|
/// the node-manager's bounded-Ask timeout. Drives the host into Stale via a DB factory whose
|
||||||
|
/// CreateDbContext throws on bootstrap (the same fall-through to <c>Become(Stale)</c> production uses).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Stale_host_fast_fails_route_node_write()
|
||||||
|
{
|
||||||
|
// A factory that always throws on CreateDbContext ⇒ Bootstrap's try fails ⇒ Become(Stale).
|
||||||
|
var db = new ThrowingDbFactory();
|
||||||
|
var coordinator = CreateTestProbe();
|
||||||
|
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||||
|
db, TestNode, coordinator.Ref,
|
||||||
|
localRoles: new HashSet<string> { "driver" }));
|
||||||
|
|
||||||
|
var asker = CreateTestProbe();
|
||||||
|
actor.Tell(new DriverHostActor.RouteNodeWrite("eq-1/speed", 123.0), asker.Ref);
|
||||||
|
|
||||||
|
var result = asker.ExpectMsg<DriverHostActor.NodeWriteResult>(TimeSpan.FromSeconds(2));
|
||||||
|
result.Success.ShouldBeFalse();
|
||||||
|
result.Reason.ShouldNotBeNull();
|
||||||
|
result.Reason!.ShouldContain("stale");
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Spawns the host with the recording driver factory, dispatches the deployment, and waits
|
/// <summary>Spawns the host with the recording driver factory, dispatches the deployment, and waits
|
||||||
/// for the Applied ACK so the apply (and thus the reverse-map build in PushDesiredSubscriptions) has
|
/// for the Applied ACK so the apply (and thus the reverse-map build in PushDesiredSubscriptions) has
|
||||||
/// completed before the test routes a write. No OPC UA / mux probes are wired — this test exercises
|
/// completed before the test routes a write. No OPC UA / mux probes are wired — this test exercises
|
||||||
@@ -200,6 +255,81 @@ public sealed class DriverHostActorWriteRoutingTests : RuntimeActorTestBase
|
|||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Seeds a single-tag Sealed deployment exactly like <see cref="SeedDeploymentWithEquipmentTags"/>,
|
||||||
|
/// except the tag's <c>TagConfig</c> is a RAW protocol-driver blob (Modbus-shaped:
|
||||||
|
/// <c>{FullName, region, address, dataType}</c>) instead of the bare Galaxy-style
|
||||||
|
/// <c>{FullName}</c> blob. The composer keys the reverse map purely on the blob's <c>FullName</c>
|
||||||
|
/// (<c>ExtractTagFullName</c> reads only that field), so the extra raw protocol keys alongside it
|
||||||
|
/// are irrelevant — proving routing is independent of the blob's broader shape.
|
||||||
|
/// </summary>
|
||||||
|
private static DeploymentId SeedDeploymentWithRawBlobTag(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> db, RevisionHash rev,
|
||||||
|
string equip, string driver, string fullName, string name)
|
||||||
|
{
|
||||||
|
var artifact = JsonSerializer.SerializeToUtf8Bytes(new
|
||||||
|
{
|
||||||
|
Namespaces = new[]
|
||||||
|
{
|
||||||
|
new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0
|
||||||
|
},
|
||||||
|
DriverInstances = new[]
|
||||||
|
{
|
||||||
|
new
|
||||||
|
{
|
||||||
|
DriverInstanceRowId = Guid.NewGuid(),
|
||||||
|
DriverInstanceId = driver,
|
||||||
|
Name = driver,
|
||||||
|
DriverType = "Modbus", // not Windows-only ⇒ a real child is spawned (not stubbed)
|
||||||
|
Enabled = true,
|
||||||
|
DriverConfig = "{}",
|
||||||
|
NamespaceId = "ns-eq",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Tags = new[]
|
||||||
|
{
|
||||||
|
new
|
||||||
|
{
|
||||||
|
TagId = "tag-raw",
|
||||||
|
EquipmentId = equip,
|
||||||
|
DriverInstanceId = driver,
|
||||||
|
Name = name,
|
||||||
|
FolderPath = (string?)null,
|
||||||
|
DataType = "Double",
|
||||||
|
// RAW protocol-driver TagConfig: FullName alongside the actual Modbus wire fields
|
||||||
|
// (region/address/dataType), NOT the bare Galaxy {FullName} blob. The composer extracts
|
||||||
|
// only FullName, proving the extra protocol keys don't change routing.
|
||||||
|
TagConfig = JsonSerializer.Serialize(
|
||||||
|
new { FullName = fullName, region = "HoldingRegister", address = 200, dataType = "UInt16" }),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
var id = DeploymentId.NewId();
|
||||||
|
using var ctx = db.CreateDbContext();
|
||||||
|
ctx.Deployments.Add(new Deployment
|
||||||
|
{
|
||||||
|
DeploymentId = id.Value,
|
||||||
|
RevisionHash = rev.Value,
|
||||||
|
Status = DeploymentStatus.Sealed,
|
||||||
|
CreatedBy = "test",
|
||||||
|
SealedAtUtc = DateTime.UtcNow,
|
||||||
|
ArtifactBlob = artifact,
|
||||||
|
});
|
||||||
|
ctx.SaveChanges();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>An <see cref="IDbContextFactory{TContext}"/> whose <c>CreateDbContext</c> always throws,
|
||||||
|
/// driving <see cref="DriverHostActor"/>'s bootstrap into the <c>catch</c> ⇒ <c>Become(Stale)</c> path
|
||||||
|
/// so a write can be routed at a Stale host.</summary>
|
||||||
|
private sealed class ThrowingDbFactory : IDbContextFactory<OtOpcUaConfigDbContext>
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public OtOpcUaConfigDbContext CreateDbContext() =>
|
||||||
|
throw new InvalidOperationException("config DB unreachable (test stub)");
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Factory producing a single <see cref="RecordingDriver"/> for the supported type, whose
|
/// <summary>Factory producing a single <see cref="RecordingDriver"/> for the supported type, whose
|
||||||
/// recorded write list is exposed for assertions.</summary>
|
/// recorded write list is exposed for assertions.</summary>
|
||||||
private sealed class RecordingDriverFactory : IDriverFactory
|
private sealed class RecordingDriverFactory : IDriverFactory
|
||||||
|
|||||||
Reference in New Issue
Block a user