docs(drivers): implementation plan for reconfigure-while-faulted (#7)
This commit is contained in:
@@ -0,0 +1,347 @@
|
||||
# Driver-reconfigure-while-faulted — Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development to implement this plan task-by-task.
|
||||
|
||||
**Goal:** A `DriverInstanceActor` that is `Connecting`/`Reconnecting` adopts a corrected config delivered via `ApplyDelta` and re-initialises with it (instead of dead-lettering and retrying the stale config forever), with the adopted config guaranteed to win against an old initialise still in flight.
|
||||
|
||||
**Architecture:** Add a monotonic **generation** to each `InitializeAsync` attempt; tag the `InitializeSucceeded`/`InitializeFailed` result records with it and drop superseded results in the `Connecting`/`Reconnecting` handlers. Add a `Receive<ApplyDelta>` to both not-connected behaviours that calls `InitializeAsync(newConfig)` (swaps `_currentConfigJson`, bumps the generation, retries immediately) and replies `ApplyResult(true,…)`. Contained to one actor; no host/contract/EF change.
|
||||
|
||||
**Tech Stack:** C# / .NET 10, Akka.NET `ReceiveActor`, xUnit + Shouldly, Akka.TestKit.
|
||||
|
||||
**Design doc:** `docs/plans/2026-06-14-driver-reconfigure-while-faulted-design.md` (committed `076ca025`).
|
||||
|
||||
---
|
||||
|
||||
### Task 0: Feature branch (already created)
|
||||
|
||||
**Classification:** trivial
|
||||
**Estimated implement time:** ~0 min (precondition already met)
|
||||
**Parallelizable with:** none
|
||||
|
||||
The branch `feat/driver-reconfigure-while-faulted` already exists off master `f9be3843`, and the
|
||||
design doc is committed on it (`076ca025`). No action — included only so the task graph records the
|
||||
precondition. Implementers for later tasks commit **on this branch** (never on master), stage **by
|
||||
path** (never `git add .`), and never stage `sql_login.txt`, `src/Server/ZB.MOM.WW.OtOpcUa.Host/pki/`,
|
||||
`pending.md`, `current.md`, or `docker-dev/docker-compose.yml`.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Adopt corrected config mid-(re)connect (`DriverInstanceActor`)
|
||||
|
||||
**Classification:** high-risk (actor state machine + concurrency)
|
||||
**Estimated implement time:** ~6 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs`
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs`
|
||||
|
||||
This is one atomic actor change (generation guard + adopt handlers are interdependent — neither is
|
||||
meaningfully landable alone) plus its tests, in a single production file + single test file. Follow
|
||||
TDD: extend the stub, write the two failing tests, implement, then prove the whole `DriverInstanceActorTests`
|
||||
class + the Runtime suite stay green.
|
||||
|
||||
**Background the implementer needs (don't re-derive):**
|
||||
- State machine: `Connecting()` (~line 217), `Connected()` (~258), `Reconnecting()` (~302),
|
||||
`Stubbed()` (~204). Only `Connected`/`Stubbed` currently handle `ApplyDelta`. The bug: `ApplyDelta`
|
||||
dead-letters in `Connecting`/`Reconnecting`.
|
||||
- `InitializeAsync` (~line 338) runs the driver init on a `Task.Run` and `self.Tell`s
|
||||
`InitializeSucceeded`/`InitializeFailed`. The `Reconnecting` `RetryConnect` timer re-calls
|
||||
`InitializeAsync(_currentConfigJson)` every `_reconnectInterval`.
|
||||
- `InitializeSucceeded`/`InitializeFailed` are **never** constructed outside this file (verified), so
|
||||
adding a `Generation` param is safe. Existing tests drive state only via `InitializeRequested` +
|
||||
polling `driver.InitializeCount` — they don't construct the result records.
|
||||
- `ApplyDelta` is `Tell`'d fire-and-forget by `DriverHostActor`; its `ApplyResult` reply already
|
||||
dead-letters harmlessly (no host handler). Reply anyway, for consistency + testability.
|
||||
- Observability for "did we connect, and on which config": with a `SetDesiredSubscriptions` set, a
|
||||
`Connected` entry auto-subscribes (`ResubscribeDesired` → `driver.SubscribeAsync`), so
|
||||
`driver.SubscribeCount` is the deterministic probe.
|
||||
|
||||
**Step 1: Extend the private `StubDriver` (additive — do not break existing fields/tests)**
|
||||
|
||||
In `DriverInstanceActorTests.cs`, extend the existing private `StubDriver` so a test can (a) capture
|
||||
each config passed to `InitializeAsync`, and (b) optionally gate/branch init behaviour per config.
|
||||
Keep `InitializeShouldThrow` + `InitializeCount` working exactly as before (used by other tests).
|
||||
|
||||
```csharp
|
||||
// Add to StubDriver:
|
||||
private readonly object _initConfigsLock = new();
|
||||
/// <summary>Every config string passed to <see cref="InitializeAsync"/>, in call order.</summary>
|
||||
public List<string> InitConfigs { get; } = new();
|
||||
/// <summary>Optional per-config init behaviour. When set, it fully owns the init outcome for that
|
||||
/// config (await/throw); <see cref="InitializeShouldThrow"/> is ignored. Null ⇒ legacy behaviour.</summary>
|
||||
public Func<string, Task>? InitBehavior { get; set; }
|
||||
|
||||
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||
{
|
||||
Interlocked.Increment(ref InitializeCount);
|
||||
lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
|
||||
if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
|
||||
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
|
||||
}
|
||||
```
|
||||
|
||||
(Change `InitializeAsync` from the current synchronous `Task`-returning body to `async Task`; the
|
||||
`return Task.CompletedTask;` line goes away. `SubscribableStubDriver`/`WritableStubDriver` inherit
|
||||
unchanged.)
|
||||
|
||||
**Step 2: Write the failing headline test — Reconnecting adopts a corrected config and connects**
|
||||
|
||||
```csharp
|
||||
/// <summary>A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config
|
||||
/// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7.</summary>
|
||||
[Fact]
|
||||
public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects()
|
||||
{
|
||||
const string bad = "{\"v\":\"bad\"}";
|
||||
const string good = "{\"v\":\"good\"}";
|
||||
var driver = new SubscribableStubDriver
|
||||
{
|
||||
// Fail every init except the corrected config.
|
||||
InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"),
|
||||
};
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
||||
|
||||
// A desired subscription so a Connected entry is observable via SubscribeCount.
|
||||
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
||||
|
||||
// Boot on the bad config → stuck Reconnecting (retries fail).
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested(bad));
|
||||
AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2)); // proven retrying
|
||||
driver.SubscribeCount.ShouldBe(0); // never connected
|
||||
|
||||
// Operator deploys the fix.
|
||||
var correlation = CorrelationId.NewId();
|
||||
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
|
||||
new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3));
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.Correlation.ShouldBe(correlation);
|
||||
|
||||
// It now connects on the corrected config and auto-subscribes.
|
||||
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
||||
driver.InitConfigs.ShouldContain(good);
|
||||
}
|
||||
```
|
||||
|
||||
**Step 3: Write the failing race test — generation guard drops a superseded in-flight result**
|
||||
|
||||
```csharp
|
||||
/// <summary>A stale InitializeSucceeded from an old (superseded) config cannot hijack the state:
|
||||
/// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the
|
||||
/// old init completing afterwards is ignored, and only the new config drives Connected.</summary>
|
||||
[Fact]
|
||||
public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored()
|
||||
{
|
||||
const string v1 = "{\"v\":1}";
|
||||
const string v2 = "{\"v\":2}";
|
||||
var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
var driver = new SubscribableStubDriver
|
||||
{
|
||||
InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task, // both succeed only when released
|
||||
};
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
|
||||
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
||||
|
||||
// v1 init starts and blocks on gate1 → actor sits in Connecting.
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested(v1));
|
||||
AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2));
|
||||
|
||||
// Adopt v2 while v1's init is still pending; v2 init starts and blocks on gate2.
|
||||
await actor.Ask<DriverInstanceActor.ApplyResult>(
|
||||
new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3));
|
||||
AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2));
|
||||
|
||||
// Release the OLD (superseded) init. Its success must be ignored — actor stays Connecting.
|
||||
gate1.SetResult();
|
||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400)); // no AttributeValuePublished / churn
|
||||
driver.SubscribeCount.ShouldBe(0); // did NOT connect on the stale v1
|
||||
|
||||
// Release the NEW init → connects on v2.
|
||||
gate2.SetResult();
|
||||
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
||||
}
|
||||
```
|
||||
|
||||
**Step 4: Run both new tests — verify they FAIL**
|
||||
|
||||
```bash
|
||||
dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests \
|
||||
--filter "FullyQualifiedName~DriverInstanceActorTests.ApplyDelta_while_Reconnecting_adopts_new_config_and_connects|FullyQualifiedName~DriverInstanceActorTests.ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored"
|
||||
```
|
||||
Expected: both FAIL today — `ApplyDelta` dead-letters in `Connecting`/`Reconnecting`, so the actor
|
||||
never connects and `SubscribeCount` stays 0 (Ask for `ApplyResult` also times out).
|
||||
|
||||
**Step 5: Implement the production change in `DriverInstanceActor.cs`**
|
||||
|
||||
(a) Field, near the other private fields (after `_currentConfigJson`):
|
||||
```csharp
|
||||
/// <summary>Monotonic token tagging each <see cref="InitializeAsync"/> attempt. An init result is
|
||||
/// honoured only when its generation matches the latest; an older result is from a superseded attempt
|
||||
/// (e.g. an <see cref="ApplyDelta"/> adopted a new config mid-(re)connect) and is dropped. Touched only
|
||||
/// on the actor thread, so no lock is needed.</summary>
|
||||
private int _initGeneration;
|
||||
```
|
||||
|
||||
(b) Tag the result records (top of the class, with the other record decls):
|
||||
```csharp
|
||||
public sealed record InitializeSucceeded(int Generation);
|
||||
public sealed record InitializeFailed(string Reason, int Generation);
|
||||
```
|
||||
|
||||
(c) `InitializeAsync` — bump + tag:
|
||||
```csharp
|
||||
private void InitializeAsync(string driverConfigJson)
|
||||
{
|
||||
_currentConfigJson = driverConfigJson;
|
||||
var generation = ++_initGeneration;
|
||||
var self = Self;
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
|
||||
self.Tell(new InitializeSucceeded(generation));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
self.Tell(new InitializeFailed(ex.Message, generation));
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
(d) Guard the result handlers. In **`Connecting()`**, prepend the stale check to both:
|
||||
```csharp
|
||||
Receive<InitializeSucceeded>(msg =>
|
||||
{
|
||||
if (msg.Generation != _initGeneration) return; // superseded by a newer InitializeAsync
|
||||
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
|
||||
Become(Connected);
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
});
|
||||
Receive<InitializeFailed>(msg =>
|
||||
{
|
||||
if (msg.Generation != _initGeneration) return; // superseded
|
||||
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
|
||||
RecordFault();
|
||||
Become(Reconnecting);
|
||||
PublishHealthSnapshot();
|
||||
});
|
||||
```
|
||||
In **`Reconnecting()`**, guard the success handler (the failed handler is already a no-op — leave it,
|
||||
optionally drop the now-unused `_` param name to `msg` for symmetry but keep it a no-op):
|
||||
```csharp
|
||||
Receive<InitializeSucceeded>(msg =>
|
||||
{
|
||||
if (msg.Generation != _initGeneration) return; // superseded
|
||||
Timers.Cancel("retry-connect");
|
||||
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
|
||||
Become(Connected);
|
||||
PublishHealthSnapshot();
|
||||
ResubscribeDesired();
|
||||
AttachAlarmSource();
|
||||
});
|
||||
```
|
||||
|
||||
(e) Add the adopt handler to **both** `Connecting()` and `Reconnecting()`. Put a private helper near
|
||||
`InitializeAsync` and wire `Receive<ApplyDelta>(AdoptConfigDuringInit);` into each behaviour:
|
||||
```csharp
|
||||
/// <summary>Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits
|
||||
/// immediately with the new config. <see cref="InitializeAsync"/> swaps <c>_currentConfigJson</c> and
|
||||
/// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped.
|
||||
/// The actor stays in its current state; the new init's result drives the next transition. In
|
||||
/// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying
|
||||
/// the new config (a redundant concurrent attempt is deduped by the generation guard).</summary>
|
||||
private void AdoptConfigDuringInit(ApplyDelta msg)
|
||||
{
|
||||
_log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now",
|
||||
_driverInstanceId);
|
||||
InitializeAsync(msg.DriverConfigJson);
|
||||
Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation));
|
||||
}
|
||||
```
|
||||
|
||||
Do **not** change `Connected()`'s `ApplyDelta` (still `ReinitializeAsync` via
|
||||
`HandleApplyDeltaAsync`) or `Stubbed()`'s.
|
||||
|
||||
**Step 6: Run the new tests — verify they PASS**
|
||||
|
||||
```bash
|
||||
dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests \
|
||||
--filter "FullyQualifiedName~DriverInstanceActorTests.ApplyDelta_while_Reconnecting_adopts_new_config_and_connects|FullyQualifiedName~DriverInstanceActorTests.ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored"
|
||||
```
|
||||
Expected: both PASS.
|
||||
|
||||
**Step 7: Run the full `DriverInstanceActorTests` class + the Runtime suite — verify no regression**
|
||||
|
||||
```bash
|
||||
dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests --filter "FullyQualifiedName~DriverInstanceActorTests"
|
||||
dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests
|
||||
```
|
||||
Expected: all green. In particular `Initialize_failure_keeps_actor_in_Reconnecting_state` (the
|
||||
generation guard makes the latest-generation `InitializeFailed` honoured and stale ones ignored — both
|
||||
are no-ops, so `InitializeCount` still climbs ≥3) and
|
||||
`ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success` (Connected path untouched).
|
||||
|
||||
**Step 8: Commit (by path)**
|
||||
|
||||
```bash
|
||||
git add src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs \
|
||||
tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs
|
||||
git commit -m "fix(drivers): adopt corrected config via ApplyDelta while (re)connecting (#7)
|
||||
|
||||
A DriverInstanceActor stuck Reconnecting/Connecting now adopts a config delivered via ApplyDelta and
|
||||
re-initialises with it, instead of dead-lettering and retrying the stale config forever. A monotonic
|
||||
init generation supersedes the in-flight init so the corrected config always wins."
|
||||
```
|
||||
|
||||
**Acceptance criteria:**
|
||||
- [ ] `ApplyDelta` in `Connecting` and `Reconnecting` adopts the new config + replies `ApplyResult(true)`.
|
||||
- [ ] A superseded in-flight init result cannot transition the actor (generation guard).
|
||||
- [ ] `Connected`/`Stubbed` `ApplyDelta` paths unchanged; no host/contract/EF change.
|
||||
- [ ] New tests pass; full `Runtime.Tests` suite green.
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Docs + bookkeeping
|
||||
|
||||
**Classification:** trivial
|
||||
**Estimated implement time:** ~2 min
|
||||
**Parallelizable with:** none (run after Task 1)
|
||||
|
||||
**Files:**
|
||||
- Modify: `pending.md` (**disk-only — never stage/commit**)
|
||||
- (No code/doc commits beyond the design+plan already committed.)
|
||||
|
||||
Done by the orchestrator, not a subagent. After Task 1 is merged-ready:
|
||||
- Update `pending.md`: mark open follow-up #7 done with the merge SHA; note the live `/run` gate is
|
||||
deferred (per user instruction this session). Do **not** `git add` it.
|
||||
- Confirm the full solution builds clean and the Runtime suite is green for the final integration
|
||||
review.
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Live `/run` verification — DEFERRED (skipped this session)
|
||||
|
||||
**Classification:** n/a (user-driven)
|
||||
**Estimated implement time:** n/a
|
||||
|
||||
**Deferred per the user's instruction ("skip the live test part").** When run: with docker-dev up, put
|
||||
`MAIN-opcua-eq` into a faulted/`Reconnecting` state via a bad `DriverConfig`, deploy a corrected config
|
||||
(`POST http://localhost:9200/api/deployments`, header `X-Api-Key: docker-dev-deploy-key`), and confirm
|
||||
from central-1 logs that the driver **adopts the new config and connects without a node restart**
|
||||
(today it would retry the stale config forever). Do not perform this step now.
|
||||
|
||||
---
|
||||
|
||||
## Execution notes (hard rules for every implementer)
|
||||
|
||||
- Commit **on `feat/driver-reconfigure-while-faulted`**, never master. Stage **by path**, never `git add .`.
|
||||
- Never stage `sql_login.txt`, `src/Server/ZB.MOM.WW.OtOpcUa.Host/pki/`, `pending.md`, `current.md`,
|
||||
`docker-dev/docker-compose.yml`. Never echo/commit secrets. No force-push, no `--no-verify`.
|
||||
- No EF/Configuration/migration change. No bUnit.
|
||||
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-06-14-driver-reconfigure-while-faulted-plan.md",
|
||||
"tasks": [
|
||||
{"id": 0, "subject": "Task 0: Feature branch (already created)", "status": "completed"},
|
||||
{"id": 1, "subject": "Task 1: Adopt corrected config mid-(re)connect (DriverInstanceActor)", "status": "pending", "blockedBy": [0]},
|
||||
{"id": 2, "subject": "Task 2: Docs + bookkeeping", "status": "pending", "blockedBy": [1]},
|
||||
{"id": 3, "subject": "Task 3: Live /run verification — DEFERRED (skipped)", "status": "pending", "blockedBy": [1]}
|
||||
],
|
||||
"lastUpdated": "2026-06-14"
|
||||
}
|
||||
Reference in New Issue
Block a user