docs(redundancy): Phase 2 implementation plan + tasks (H3 ServiceLevel wiring)
This commit is contained in:
@@ -0,0 +1,564 @@
|
||||
# Phase 2 — Health-aware redundancy ServiceLevel (H3) Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Wire the existing-but-never-invoked `ServiceLevelCalculator` into the live per-node OPC UA publish
|
||||
path so a DB-unreachable / OPC-UA-unreachable node drops below its role-based ServiceLevel (and below a
|
||||
healthy peer), triggering client failover.
|
||||
|
||||
**Architecture:** Per-driver-node, inside `Runtime`. `OpcUaPublishActor` computes its own ServiceLevel from
|
||||
real local health (local `DbHealthProbeActor`, peer's-probe-of-me from the `redundancy-state` topic, the
|
||||
role snapshot, signal freshness) via `ServiceLevelCalculator.Compute`, replacing its role-only switch — with
|
||||
a **backward-compatible seam** (no DB-health source ⇒ legacy role-only). A new tiny `PeerProbeSupervisor`
|
||||
finally spawns `PeerOpcUaProbeActor` per driver peer. No admin-singleton change, no EF migration, no Commons
|
||||
message-contract change.
|
||||
|
||||
**Tech Stack:** C# / .NET 10, Akka.NET (ReceiveActor, DistributedPubSub, IWithTimers, Ask+PipeTo), xUnit +
|
||||
Shouldly + Akka.TestKit. Design: `docs/plans/2026-06-15-stillpending-phase-2-servicelevel-design.md`.
|
||||
|
||||
**Branch:** `feat/stillpending-phase-2-servicelevel` (already created off master `4bd7180e`; design doc
|
||||
committed `05283533`).
|
||||
|
||||
---
|
||||
|
||||
## Hard rules (every task)
|
||||
|
||||
- Stage **by path** — never `git add .`. Never stage `sql_login.txt`, `src/Server/.../Host/pki/`,
|
||||
`pending.md`, `current.md`, `docker-dev/docker-compose.yml`, `stillpending.md`. Never echo/commit secrets.
|
||||
No force-push, no `--no-verify`.
|
||||
- **NO** Configuration entity / EF migration. **NO** change to `OpcUaProbeResult` / `RedundancyStateChanged`
|
||||
/ `NodeRedundancyState` contracts (reuse as-is).
|
||||
- TDD fail-then-pass. xUnit + Shouldly. **NO bUnit.** Cross-node behavior proven only by live `/run`.
|
||||
- Production projects are `TreatWarningsAsErrors` — fix all warnings.
|
||||
|
||||
## Key facts the implementer needs
|
||||
|
||||
- `ServiceLevelCalculator.Compute(NodeHealthInputs)` is pure and **must not be reshaped**. Tiers:
|
||||
`MemberState not Up/Joining → 0`; `(DbReachable,OpcUaProbeOk,Stale)` → `(true,true,false)=240`,
|
||||
`(true,_,true)=200`, `(false,_,true)=100`, else `0`; `+10` if `IsDriverRoleLeader` (clamp 255).
|
||||
- `Runtime` already references `Core.Cluster` (`ZB.MOM.WW.OtOpcUa.Cluster.csproj`); `ControlPlane` too.
|
||||
`Runtime` does **NOT** reference `ControlPlane` and must not start to (deliberate boundary — see the
|
||||
comment in `PeerOpcUaProbeActor.cs:22`). So the calculator **moves** to `Core.Cluster` (Task 1).
|
||||
- `DbHealthProbeActor` (`Runtime/Health/`) answers `GetStatus` → `DbHealthStatus(bool Reachable, DateTime
|
||||
AsOfUtc, string? LastError)` from cache (cheap). Already spawned in `WithOtOpcUaRuntimeActors` as the local
|
||||
`dbHealth` `IActorRef`.
|
||||
- `PeerOpcUaProbeActor` (`Runtime/Health/`) publishes `OpcUaProbeResult(NodeId peer, bool Ok)` to the
|
||||
`redundancy-state` topic. `Props(peer, interval?, connectTimeout?, opcUaPort=4840, broadcast?)`.
|
||||
- `OpcUaPublishActor` subscribes to the `redundancy-state` topic in PreStart, so it already receives every
|
||||
message on it — adding `Receive<OpcUaProbeResult>` makes those visible with no extra subscribe.
|
||||
- Test base `RuntimeActorTestBase` self-joins a single-node `driver` cluster → `Cluster.Get(Sys).SelfMember.
|
||||
Status == Up` works in tests. The two existing tests in `OpcUaPublishActorTests.cs`
|
||||
(`..._primary_leader` → 240, `..._for_secondary_publishes_100` → 100) inject **no** `dbHealthProbe`, so the
|
||||
legacy seam keeps them green unchanged.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Move `ServiceLevelCalculator` to `Core.Cluster` (shared, cycle-free)
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** none (Task 2 depends on this)
|
||||
|
||||
**Files:**
|
||||
- Create: `src/Core/ZB.MOM.WW.OtOpcUa.Cluster/Redundancy/ServiceLevelCalculator.cs`
|
||||
- Delete: `src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/ServiceLevelCalculator.cs`
|
||||
- Modify: `tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ServiceLevelCalculatorTests.cs` (using/namespace)
|
||||
|
||||
**Step 1: Verify `Core.Cluster` references `Akka.Cluster`** (the calculator uses `MemberStatus`):
|
||||
Run: `grep -rn "Akka.Cluster" src/Core/ZB.MOM.WW.OtOpcUa.Cluster/ZB.MOM.WW.OtOpcUa.Cluster.csproj && grep -rln "using Akka.Cluster" src/Core/ZB.MOM.WW.OtOpcUa.Cluster/`
|
||||
Expected: a PackageReference (or transitive) to `Akka.Cluster` and existing `using Akka.Cluster;` files
|
||||
(`ClusterRoleInfo` uses it). If the package is **not** referenced there, STOP and surface it — fall back to
|
||||
adding `Akka.Cluster` as a `PackageReference` on `Core.Cluster` (it already depends on Akka cluster types) or
|
||||
placing the file in `Commons` only if `Commons` references `Akka.Cluster`. Prefer `Core.Cluster`.
|
||||
|
||||
**Step 2: Move the type verbatim**, changing only the namespace:
|
||||
`namespace ZB.MOM.WW.OtOpcUa.Cluster.Redundancy;` (keep `using Akka.Cluster;`, keep `NodeHealthInputs` +
|
||||
`ServiceLevelCalculator` together, keep all XML docs). Delete the old ControlPlane file.
|
||||
|
||||
**Step 3: Fix references.** `RedundancyStateActor.cs` does not call the calculator (only doc-comment
|
||||
mentions) — update the doc-comment text if it names the old namespace, no `using` needed. Update the test:
|
||||
```csharp
|
||||
// tests/.../ServiceLevelCalculatorTests.cs
|
||||
using ZB.MOM.WW.OtOpcUa.Cluster.Redundancy; // was ZB.MOM.WW.OtOpcUa.ControlPlane.Redundancy
|
||||
```
|
||||
Also `grep -rn "ControlPlane.Redundancy.ServiceLevelCalculator\|ControlPlane.Redundancy;.*ServiceLevel\|NodeHealthInputs" src tests` and fix any stragglers (e.g. `OpcUaPublishActor.cs`'s `<see cref>` doc-comment, `OtOpcUaConfigDbContext.cs:159` comment text — comment-only, update the prose).
|
||||
|
||||
**Step 4: Build + run the moved test.**
|
||||
Run: `dotnet build ZB.MOM.WW.OtOpcUa.slnx` then
|
||||
`dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests --filter "FullyQualifiedName~ServiceLevelCalculator"`
|
||||
Expected: build clean; `ServiceLevelCalculatorTests` PASS (unchanged behavior, new namespace).
|
||||
|
||||
**Step 5: Commit**
|
||||
```bash
|
||||
git add src/Core/ZB.MOM.WW.OtOpcUa.Cluster/Redundancy/ServiceLevelCalculator.cs \
|
||||
src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/ServiceLevelCalculator.cs \
|
||||
tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ServiceLevelCalculatorTests.cs
|
||||
# plus any comment-only files touched in Step 3, by path
|
||||
git commit -m "refactor(redundancy): move ServiceLevelCalculator to Core.Cluster (shared, Runtime-reachable)"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Task 2a: `OpcUaPublishActor` calculator path — DB + Stale + leader + Detached guard (legacy seam)
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none (Task 1 → 2a → 2b → 3 are serial on the same file)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs`
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs`
|
||||
|
||||
**Design contract:** `RecomputeServiceLevel()` resolves the local snapshot entry; **no entry or
|
||||
`Role==Detached` ⇒ publish `0`**. If **`_dbHealthProbe is null` OR `_lastDbHealth is null`** (no DB-health
|
||||
source yet) ⇒ **legacy role-only byte** (the old switch — Primary-leader 240 / Primary 200 / Secondary 100 /
|
||||
Detached 0) — this is the bootstrap + back-compat path. Otherwise build `NodeHealthInputs` and call
|
||||
`ServiceLevelCalculator.Compute`. `MemberState` ← `Cluster.Get(Context.System).SelfMember.Status`.
|
||||
`Stale = !DbReachable || (now - _lastDbHealth.AsOfUtc) > _staleWindow || (now - localEntry.AsOfUtc) > _staleWindow`.
|
||||
`OpcUaProbeOk` is hardcoded `true` in this task (Task 2b wires the real value).
|
||||
|
||||
**Step 1: Write failing tests** (add to `OpcUaPublishActorTests.cs`). A tiny stub DB-health probe actor +
|
||||
calculator-path assertions:
|
||||
```csharp
|
||||
using Akka.Actor;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Health; // DbHealthProbeActor.GetStatus / DbHealthStatus
|
||||
|
||||
// helper at class scope:
|
||||
private sealed class StubDbHealth : ReceiveActor
|
||||
{
|
||||
public StubDbHealth(DbHealthProbeActor.DbHealthStatus status) =>
|
||||
Receive<DbHealthProbeActor.GetStatus>(_ => Sender.Tell(status));
|
||||
}
|
||||
|
||||
[Fact] // healthy primary-leader → 250
|
||||
public void Calculator_path_healthy_primary_leader_publishes_250()
|
||||
{
|
||||
var publisher = new RecordingPublisher();
|
||||
var local = NodeId.Parse("primary-node");
|
||||
var db = Sys.ActorOf(Props.Create(() => new StubDbHealth(
|
||||
new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null))));
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
serviceLevel: publisher, localNode: local, dbHealthProbe: db,
|
||||
staleWindow: TimeSpan.FromSeconds(30)));
|
||||
|
||||
// Seed DB-health directly (deterministic, no timer): Receive<DbHealthStatus> caches + recomputes.
|
||||
actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null));
|
||||
actor.Tell(new RedundancyStateChanged(new[]
|
||||
{
|
||||
new NodeRedundancyState(local, RedundancyRole.Primary, true, true, DateTime.UtcNow),
|
||||
}, CorrelationId.NewId()));
|
||||
|
||||
AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
[Fact] // healthy secondary → 240 (NOT 100 — documented change)
|
||||
public void Calculator_path_healthy_secondary_publishes_240() { /* Role=Secondary, IsRoleLeader=false → 240 */ }
|
||||
|
||||
[Fact] // DB unreachable → 100 (Stale via !DbReachable)
|
||||
public void Calculator_path_db_unreachable_publishes_100() { /* DbHealthStatus(false,...) + Primary-leader → 100 */ }
|
||||
|
||||
[Fact] // stale snapshot but DB ok → 200
|
||||
public void Calculator_path_stale_snapshot_publishes_200() { /* DbHealthStatus(true, now) but entry AsOfUtc = now-1min, staleWindow=2s → 200 (leader → 200, no +10? see note) */ }
|
||||
|
||||
[Fact] // Detached → 0
|
||||
public void Calculator_path_detached_publishes_0() { /* local entry Role=Detached → 0 */ }
|
||||
|
||||
[Fact] // back-compat: no dbHealthProbe → legacy role-only (existing two tests already cover 240 & 100)
|
||||
public void Legacy_path_when_no_db_probe_keeps_role_only() { /* no dbHealthProbe → Primary-leader → 240 */ }
|
||||
```
|
||||
NOTE for the 200 test: with `IsDriverRoleLeader=true`, `Compute` adds +10 → 210. Either assert 210 for a
|
||||
leader, or build the 200 case with `IsRoleLeaderForDriver:false` (a stale follower) → exactly 200. Use a
|
||||
**stale follower** so the expected byte is unambiguous (200).
|
||||
|
||||
**Step 2: Run, expect FAIL** (`dbHealthProbe`/`staleWindow` params don't exist; calculator not called):
|
||||
Run: `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests --filter "FullyQualifiedName~OpcUaPublishActorTests.Calculator_path"`
|
||||
|
||||
**Step 3: Implement.** In `OpcUaPublishActor.cs`:
|
||||
- Add `using ZB.MOM.WW.OtOpcUa.Cluster.Redundancy;` and `using ZB.MOM.WW.OtOpcUa.Runtime.Health;` and
|
||||
`using Akka.Cluster;`.
|
||||
- Add Props/PropsForTests params (all optional, defaulted): `IActorRef? dbHealthProbe = null`,
|
||||
`TimeSpan? staleWindow = null` (default e.g. 30 s). Thread through both factory methods + the ctor.
|
||||
- Add fields: `IActorRef? _dbHealthProbe; TimeSpan _staleWindow; DbHealthProbeActor.DbHealthStatus? _lastDbHealth;
|
||||
RedundancyStateChanged? _lastSnapshot; Cluster? _cluster;` (resolve `_cluster = Cluster.Get(Context.System)`
|
||||
lazily/in ctor — safe; tests form a cluster).
|
||||
- `Receive<DbHealthProbeActor.DbHealthStatus>(s => { _lastDbHealth = s; RecomputeServiceLevel(); });`
|
||||
- Replace `HandleRedundancyStateChanged` body with `{ _lastSnapshot = msg; RecomputeServiceLevel(); }`.
|
||||
- New `RecomputeServiceLevel()`:
|
||||
```csharp
|
||||
private void RecomputeServiceLevel()
|
||||
{
|
||||
if (_localNode is null || _lastSnapshot is null) return;
|
||||
var entry = _lastSnapshot.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||
if (entry is null || entry.Role == RedundancyRole.Detached) { Self.Tell(new ServiceLevelChanged(0)); return; }
|
||||
|
||||
// Back-compat / bootstrap: no DB-health source → legacy role-only (keeps existing tests + a sane
|
||||
// value until the first DbHealthStatus arrives).
|
||||
if (_dbHealthProbe is null || _lastDbHealth is null) { Self.Tell(new ServiceLevelChanged(LegacyRoleOnly(entry))); return; }
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
var stale = !_lastDbHealth.Reachable
|
||||
|| (now - _lastDbHealth.AsOfUtc) > _staleWindow
|
||||
|| (now - entry.AsOfUtc) > _staleWindow;
|
||||
var inputs = new NodeHealthInputs(
|
||||
MemberState: SafeSelfStatus(),
|
||||
DbReachable: _lastDbHealth.Reachable,
|
||||
OpcUaProbeOk: true, // Task 2b replaces with the freshness-debounced value
|
||||
Stale: stale,
|
||||
IsDriverRoleLeader: entry.IsRoleLeaderForDriver);
|
||||
Self.Tell(new ServiceLevelChanged(ServiceLevelCalculator.Compute(inputs)));
|
||||
}
|
||||
|
||||
private static byte LegacyRoleOnly(NodeRedundancyState e) => e.Role switch
|
||||
{
|
||||
RedundancyRole.Primary when e.IsRoleLeaderForDriver => 240,
|
||||
RedundancyRole.Primary => 200,
|
||||
RedundancyRole.Secondary => 100,
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
private MemberStatus SafeSelfStatus()
|
||||
{
|
||||
try { return _cluster!.SelfMember.Status; } catch { return MemberStatus.Removed; }
|
||||
}
|
||||
```
|
||||
(The `Self.Tell(ServiceLevelChanged)` reuses the existing dedup/publish/metric handler — do not publish
|
||||
directly.)
|
||||
|
||||
**Step 4: Run, expect PASS** (the new `Calculator_path*` tests + the two pre-existing role-only tests).
|
||||
Run: `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests --filter "FullyQualifiedName~OpcUaPublishActorTests"`
|
||||
Expected: all green, including the unchanged `..._primary_leader` (240) and `..._secondary_publishes_100`.
|
||||
|
||||
**Step 5: Commit** (stage `OpcUaPublishActor.cs` + the test file by path).
|
||||
`git commit -m "feat(redundancy): OpcUaPublishActor computes ServiceLevel via calculator (DB+stale+leader; legacy seam)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 2b: `OpcUaProbeOk` from peer-probes-me (freshness + debounce)
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs`
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs`
|
||||
|
||||
**Contract:** `OpcUaProbeOk()` = `true` if **no fresh** `OpcUaProbeResult` about the local node exists, else
|
||||
the latest such result's `Ok` (only an actively-observed, recent `Ok==false` demotes). "Fresh" = within
|
||||
`_probeFreshnessWindow` (injectable, default e.g. 30 s). Single-node / no peer → always `true`.
|
||||
|
||||
**Step 1: Failing tests:**
|
||||
```csharp
|
||||
[Fact] // peer reports me unreachable, DB ok + fresh → (true,false,false) → 0
|
||||
public void Probe_false_about_me_with_healthy_db_publishes_0() {
|
||||
// inject dbHealthProbe(true) + staleWindow 30s + probeFreshnessWindow 30s
|
||||
// Tell DbHealthStatus(true, now); Tell new OpcUaProbeResult(local, Ok:false);
|
||||
// Tell RedundancyStateChanged(primary-leader) → AwaitAssert Levels contains 0
|
||||
}
|
||||
[Fact] // absent/stale probe result → OpcUaProbeOk true → healthy 250 (don't penalize missing peer)
|
||||
public void No_probe_result_defaults_ok_true_publishes_250() { /* never Tell OpcUaProbeResult → 250 */ }
|
||||
[Fact] // a later Ok:true supersedes an earlier Ok:false (recovery)
|
||||
public void Probe_true_supersedes_earlier_false() { /* Tell false then true → recompute → 250 */ }
|
||||
```
|
||||
Ignore `OpcUaProbeResult` whose `NodeId != _localNode` (it's about a peer, not me).
|
||||
|
||||
**Step 2: Run, expect FAIL.**
|
||||
|
||||
**Step 3: Implement.** Add `Receive<OpcUaProbeResult>(HandlePeerProbe);` and:
|
||||
```csharp
|
||||
private (bool Ok, DateTime At)? _probeAboutMe; // latest peer verdict on THIS node
|
||||
private TimeSpan _probeFreshnessWindow;
|
||||
|
||||
private void HandlePeerProbe(PeerOpcUaProbeActor.OpcUaProbeResult r)
|
||||
{
|
||||
if (_localNode is null || r.NodeId != _localNode.Value) return; // only verdicts about me
|
||||
_probeAboutMe = (r.Ok, DateTime.UtcNow);
|
||||
RecomputeServiceLevel();
|
||||
}
|
||||
|
||||
private bool OpcUaProbeOk()
|
||||
{
|
||||
if (_probeAboutMe is not { } p) return true; // no peer verdict → benefit of doubt
|
||||
if (DateTime.UtcNow - p.At > _probeFreshnessWindow) return true; // stale verdict → benefit of doubt
|
||||
return p.Ok;
|
||||
}
|
||||
```
|
||||
Replace `OpcUaProbeOk: true` in `RecomputeServiceLevel` with `OpcUaProbeOk: OpcUaProbeOk()`. Add the
|
||||
`probeFreshnessWindow` Props/PropsForTests param + ctor wiring (default ~30 s). Add
|
||||
`using ZB.MOM.WW.OtOpcUa.Runtime.Health;` (already added in 2a).
|
||||
|
||||
**Step 4: Run, expect PASS** (new probe tests + all Task 2a + the two legacy tests).
|
||||
|
||||
**Step 5: Commit** by path.
|
||||
`git commit -m "feat(redundancy): OpcUaProbeOk from peer-probes-me with freshness debounce"`
|
||||
|
||||
---
|
||||
|
||||
### Task 3: `HealthTick` — periodic DB Ask/PipeTo + immediate PreStart refresh
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs`
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs`
|
||||
|
||||
**Contract:** the actor refreshes DB health on its own so production needs no external pump. Implement
|
||||
`IWithTimers`. In `PreStart` (only when `_dbHealthProbe is not null`): kick an immediate tick + a periodic
|
||||
timer (`_healthTickInterval`, injectable, default 5 s). `Receive<HealthTick>` → `_dbHealthProbe.Ask<DbHealth
|
||||
ProbeActor.DbHealthStatus>(DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(1)).PipeTo(Self)`; on
|
||||
Ask failure (timeout) PipeTo a `DbHealthStatus(false, now, "ask-timeout")` (fail-safe demote). The existing
|
||||
`Receive<DbHealthStatus>` caches + recomputes.
|
||||
|
||||
**Step 1: Failing test** — proves the Ask path without the test Telling `DbHealthStatus` directly:
|
||||
```csharp
|
||||
[Fact]
|
||||
public void HealthTick_asks_db_probe_and_publishes_calculator_byte()
|
||||
{
|
||||
var publisher = new RecordingPublisher();
|
||||
var local = NodeId.Parse("primary-node");
|
||||
var db = Sys.ActorOf(Props.Create(() => new StubDbHealth(
|
||||
new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null))));
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
serviceLevel: publisher, localNode: local, dbHealthProbe: db,
|
||||
staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30),
|
||||
healthTickInterval: TimeSpan.FromMilliseconds(100)));
|
||||
|
||||
actor.Tell(new RedundancyStateChanged(new[]
|
||||
{
|
||||
new NodeRedundancyState(local, RedundancyRole.Primary, true, true, DateTime.UtcNow),
|
||||
}, CorrelationId.NewId()));
|
||||
|
||||
// No direct DbHealthStatus Tell — the periodic HealthTick Ask must populate it → 250.
|
||||
AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), TimeSpan.FromSeconds(2));
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run, expect FAIL** (`healthTickInterval` param missing; no tick wired).
|
||||
|
||||
**Step 3: Implement.** `public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers`; add
|
||||
`public ITimerScheduler Timers { get; set; } = null!;`. Add `record HealthTick` singleton. Add the
|
||||
`healthTickInterval` param to both Props factories + ctor (store `_healthTickInterval`). Extend `PreStart`:
|
||||
```csharp
|
||||
protected override void PreStart()
|
||||
{
|
||||
if (_subscribeRedundancyTopic)
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
|
||||
if (_dbHealthProbe is not null)
|
||||
{
|
||||
Self.Tell(HealthTick.Instance); // immediate refresh
|
||||
Timers.StartPeriodicTimer("health", HealthTick.Instance, _healthTickInterval);
|
||||
}
|
||||
}
|
||||
```
|
||||
`Receive<HealthTick>(_ => _dbHealthProbe!.Ask<DbHealthProbeActor.DbHealthStatus>(
|
||||
DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(1))
|
||||
.ContinueWith(t => t.IsCompletedSuccessfully ? t.Result
|
||||
: new DbHealthProbeActor.DbHealthStatus(false, DateTime.UtcNow, "ask-timeout"))
|
||||
.PipeTo(Self));`
|
||||
|
||||
**Step 4: Run, expect PASS** — the HealthTick test + all prior tests. Verify the pinned-dispatcher production
|
||||
`Props()` still builds (it does not set `dbHealthProbe`, so PreStart skips the timer in pure-Props tests).
|
||||
|
||||
**Step 5: Commit** by path.
|
||||
`git commit -m "feat(redundancy): periodic HealthTick refreshes DB reachability via Ask/PipeTo"`
|
||||
|
||||
---
|
||||
|
||||
### Task 4: `PeerProbeSupervisor` — spawn one probe per driver peer
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** Task 6 (docs) — disjoint files
|
||||
|
||||
**Files:**
|
||||
- Create: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerProbeSupervisor.cs`
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/PeerProbeSupervisorTests.cs`
|
||||
|
||||
**Contract:** subscribes to the `redundancy-state` topic; on each `RedundancyStateChanged`, maintain exactly
|
||||
one child per **other, non-Detached** node (`n.NodeId != _localNode && n.Role != Detached`): spawn for new
|
||||
peers, `Context.Stop` children for departed peers. Child `Props` come from an injected
|
||||
`Func<NodeId, Props>` (production default `peer => PeerOpcUaProbeActor.Props(peer, opcUaPort: _opcUaPort)`).
|
||||
Child actor name = a stable sanitization of the peer node id (so re-adds match). Default dispatcher (NOT the
|
||||
pinned OPC UA one).
|
||||
|
||||
**Step 1: Failing tests** (drive lifecycle with fabricated snapshots + a stub probe factory; assert child
|
||||
count via `ActorCells`/a TestProbe registry):
|
||||
```csharp
|
||||
public sealed class PeerProbeSupervisorTests : RuntimeActorTestBase
|
||||
{
|
||||
private static Props StubChild() => Props.Create(() => new BlackHoleActor()); // or EchoActor
|
||||
|
||||
[Fact]
|
||||
public void Spawns_one_child_per_non_self_non_detached_peer()
|
||||
{
|
||||
var local = NodeId.Parse("me:4053");
|
||||
var sup = ActorOfAsTestActorRef<PeerProbeSupervisor>(
|
||||
PeerProbeSupervisor.PropsForTests(local, _ => StubChild()));
|
||||
sup.Tell(new RedundancyStateChanged(new[]
|
||||
{
|
||||
new NodeRedundancyState(local, RedundancyRole.Primary, true, true, DateTime.UtcNow),
|
||||
new NodeRedundancyState(NodeId.Parse("peer:4053"), RedundancyRole.Secondary, false, false, DateTime.UtcNow),
|
||||
new NodeRedundancyState(NodeId.Parse("adm:4053"), RedundancyRole.Detached, false, false, DateTime.UtcNow),
|
||||
}, CorrelationId.NewId()));
|
||||
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1)); // peer only; self + detached excluded
|
||||
}
|
||||
|
||||
[Fact] public void Stops_child_for_departed_peer() { /* snapshot with peer, then without → ChildCount 1→0 */ }
|
||||
[Fact] public void Single_node_snapshot_spawns_no_children() { /* only local entry → 0 */ }
|
||||
}
|
||||
```
|
||||
Expose a test-visible `ChildCount` (e.g. `public int ChildCount => Context.GetChildren().Count();`) OR assert
|
||||
via `ExpectMsg` on a registry. Prefer `ChildCount` with `ActorOfAsTestActorRef`.
|
||||
|
||||
**Step 2: Run, expect FAIL** (type doesn't exist).
|
||||
|
||||
**Step 3: Implement** `PeerProbeSupervisor : ReceiveActor`:
|
||||
```csharp
|
||||
public static Props Props(NodeId localNode, int opcUaPort = PeerOpcUaProbeActor.DefaultOpcUaPort) =>
|
||||
Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode,
|
||||
peer => PeerOpcUaProbeActor.Props(peer, opcUaPort: opcUaPort)));
|
||||
public static Props PropsForTests(NodeId localNode, Func<NodeId, Props> probeFactory) =>
|
||||
Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode, probeFactory));
|
||||
|
||||
private readonly Dictionary<NodeId, IActorRef> _children = new();
|
||||
// ctor: store localNode + probeFactory; Receive<RedundancyStateChanged>(OnSnapshot); Receive<SubscribeAck>(_=>{});
|
||||
protected override void PreStart() =>
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe("redundancy-state", Self));
|
||||
|
||||
private void OnSnapshot(RedundancyStateChanged s)
|
||||
{
|
||||
var want = s.Nodes.Where(n => n.NodeId != _localNode && n.Role != RedundancyRole.Detached)
|
||||
.Select(n => n.NodeId).ToHashSet();
|
||||
foreach (var gone in _children.Keys.Where(k => !want.Contains(k)).ToList())
|
||||
{ Context.Stop(_children[gone]); _children.Remove(gone); }
|
||||
foreach (var peer in want.Where(p => !_children.ContainsKey(p)))
|
||||
_children[peer] = Context.ActorOf(_probeFactory(peer), "probe-" + Sanitize(peer.Value));
|
||||
}
|
||||
private static string Sanitize(string s) => new(s.Select(c => char.IsLetterOrDigit(c) ? c : '-').ToArray());
|
||||
```
|
||||
(Use the literal `"redundancy-state"` to match `PeerOpcUaProbeActor.RedundancyStateTopic` — keep the Runtime
|
||||
"duplicated to avoid a ControlPlane ref" convention. Reference `PeerOpcUaProbeActor.RedundancyStateTopic`
|
||||
since both are in `Runtime.Health`.)
|
||||
|
||||
**Step 4: Run, expect PASS.**
|
||||
|
||||
**Step 5: Commit** by path.
|
||||
`git commit -m "feat(redundancy): PeerProbeSupervisor maintains one peer OPC UA probe per driver peer"`
|
||||
|
||||
---
|
||||
|
||||
### Task 5: Wire into `WithOtOpcUaRuntimeActors`
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** none (depends on 2a–4)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs`
|
||||
|
||||
**Step 1: Implement** (no new unit test — DI wiring is covered by the live `/run` gate + existing
|
||||
`ServiceCollectionExtensionsTests` smoke if present; if that test asserts a fixed actor set, update it):
|
||||
- Pass the local `dbHealth` ref into the publish actor:
|
||||
```csharp
|
||||
var publishActor = system.ActorOf(
|
||||
OpcUaPublishActor.Props(
|
||||
sink: addressSpaceSink, serviceLevel: serviceLevel, localNode: roleInfo.LocalNode,
|
||||
dbFactory: dbFactory, applier: applier,
|
||||
dbHealthProbe: dbHealth), // NEW — enables the calculator path in production
|
||||
OpcUaPublishActorName);
|
||||
```
|
||||
- Spawn the supervisor after `publishActor`:
|
||||
```csharp
|
||||
var peerProbes = system.ActorOf(PeerProbeSupervisor.Props(roleInfo.LocalNode), PeerProbeSupervisorName);
|
||||
registry.Register<PeerProbeSupervisorKey>(peerProbes);
|
||||
```
|
||||
- Add `public const string PeerProbeSupervisorName = "peer-probe-supervisor";` near the other name consts and
|
||||
`public sealed class PeerProbeSupervisorKey { }` near the other key classes.
|
||||
- Confirm `OpcUaPublishActor.Props` signature now accepts `dbHealthProbe` (add the optional param in Task 2a;
|
||||
if not yet, this task is blocked — order 2a before 5).
|
||||
|
||||
**Step 2: Build + full test.**
|
||||
Run: `dotnet build ZB.MOM.WW.OtOpcUa.slnx && dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests`
|
||||
Expected: clean build, all green. If a `ServiceCollectionExtensionsTests` enumerates spawned actors, extend
|
||||
it for the new supervisor.
|
||||
|
||||
**Step 3: Commit** by path (`ServiceCollectionExtensions.cs` only, plus the test if updated).
|
||||
`git commit -m "feat(redundancy): wire dbHealth into OpcUaPublishActor + spawn PeerProbeSupervisor per node"`
|
||||
|
||||
---
|
||||
|
||||
### Task 6: Docs — `docs/Redundancy.md` marks the calculator WIRED
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** Task 4
|
||||
|
||||
**Files:**
|
||||
- Modify: `docs/Redundancy.md`
|
||||
|
||||
**Step 1:** Update the doc to reflect shipped reality:
|
||||
- The `ServiceLevelCalculator` table row (line ~18) + the "Today only cluster topology drives the published
|
||||
ServiceLevel" line (~80) + the "designed to use once that path goes…" note (~85): change to **WIRED** —
|
||||
each driver node now computes its ServiceLevel via `ServiceLevelCalculator` from local `DbHealthProbeActor`
|
||||
reachability, a peer OPC UA probe (`PeerProbeSupervisor` → `PeerOpcUaProbeActor`, "peer-probes-me"), and
|
||||
signal-freshness staleness.
|
||||
- Document the **backward-compat seam**: a node with no DB-health source falls back to role-only.
|
||||
- Document the **behavior change**: a healthy Secondary now publishes **240** (was 100); both healthy nodes
|
||||
sit at 240/250 with the leader preferred by +10. Note `Stale = !DbReachable || signals older than the
|
||||
freshness window`, and that the calculator moved to `Core.Cluster`.
|
||||
- Keep the truth table (250/240/200/100/0) — it's now reachable.
|
||||
|
||||
**Step 2:** No build needed (doc only). Sanity-grep for now-false claims:
|
||||
`grep -n "not yet wired\|placeholder\|F10b\|coarse role" docs/Redundancy.md` and fix each.
|
||||
|
||||
**Step 3: Commit** by path.
|
||||
`git commit -m "docs(redundancy): ServiceLevelCalculator is wired into the live publish path"`
|
||||
|
||||
---
|
||||
|
||||
### Task 7: Full build + test + final integration review
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:** none (verification only)
|
||||
|
||||
**Step 1:** `dotnet build ZB.MOM.WW.OtOpcUa.slnx` — clean (TreatWarningsAsErrors).
|
||||
**Step 2:** `dotnet test ZB.MOM.WW.OtOpcUa.slnx` — full suite green (esp. `OpcUaPublishActorTests`,
|
||||
`PeerProbeSupervisorTests`, `ServiceLevelCalculatorTests`, `RedundancyStateActorTests`, `ServiceLevelEndToEndTests`).
|
||||
**Step 3:** Final integration reviewer subagent: confirm (a) no Runtime→ControlPlane reference was added;
|
||||
(b) the two pre-existing role-only tests are still green via the legacy seam; (c) no Commons contract / EF
|
||||
change; (d) staging was by-path only; (e) the Detached guard + probe-freshness debounce + DB-Ask-timeout
|
||||
fail-safe are all present. Apply any actionable findings as follow-up commits.
|
||||
**Step 4:** Update `pending.md` (working-tree only, never staged) with the Phase 2 status.
|
||||
|
||||
---
|
||||
|
||||
### Task 8: Live `/run` on the 2-node rig (acceptance gate)
|
||||
|
||||
**Classification:** high-risk (verification)
|
||||
**Estimated implement time:** ~6 min (agent-driven; docker-dev login is disabled — agent does NOT sign in to anything)
|
||||
|
||||
**Files:** none (runtime verification on docker-dev MAIN = central-1/central-2, the 2-node warm pair).
|
||||
|
||||
**Recipe:**
|
||||
1. Rebuild the local rig to this branch: `docker compose -f docker-dev/docker-compose.yml up -d --build`
|
||||
(do NOT stage compose changes). Confirm both central nodes are healthy and bootstrap-deploy the config.
|
||||
2. **Steady state:** read each node's `Server.ServiceLevel` via Client.CLI
|
||||
(`dotnet run --project src/Client/ZB.MOM.WW.OtOpcUa.Client.CLI -- read -u opc.tcp://localhost:4840 -n "i=2267"`
|
||||
— `ServiceLevel` is NodeId `i=2267`; confirm the actual id) on `:4840` (central-1) and `:4841` (central-2).
|
||||
Expect the leader at **250** and the follower at **240** (the documented healthy pair).
|
||||
3. **Demote one node's DB:** isolate ONLY one central node from SQL (they share the SQL container, so do
|
||||
NOT stop SQL). Use `docker network disconnect <net> otopcua-dev-central-1-1` (or block its SQL route) so
|
||||
central-1's `DbHealthProbeActor` starts failing while central-2 stays healthy.
|
||||
4. Within ~`max(healthTickInterval, staleWindow)` confirm central-1's `ServiceLevel` drops to **100** (DB
|
||||
unreachable ⇒ stale) — i.e. **below** central-2's 240 — proving health-aware demotion + the failover
|
||||
ordering. Reconnect the network and confirm it climbs back to 240/250.
|
||||
5. Capture the observed bytes. If the docker network manipulation is not feasible on the rig, record exactly
|
||||
what blocked and hand that single step to the user; everything else is agent-driven.
|
||||
|
||||
**Done when:** build clean + full `dotnet test` green + the live demotion observed (or the precise blocker
|
||||
recorded). Then `finishing-a-development-branch` → merge to master + push.
|
||||
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-06-15-stillpending-phase-2-servicelevel.md",
|
||||
"branch": "feat/stillpending-phase-2-servicelevel",
|
||||
"tasks": [
|
||||
{"id": 414, "subject": "P2 Task 1: Move ServiceLevelCalculator to Core.Cluster", "status": "pending"},
|
||||
{"id": 415, "subject": "P2 Task 2a: OpcUaPublishActor calculator path (DB+stale+leader+Detached guard, legacy seam)", "status": "pending", "blockedBy": [414]},
|
||||
{"id": 416, "subject": "P2 Task 2b: OpcUaProbeOk from peer-probes-me (freshness + debounce)", "status": "pending", "blockedBy": [415]},
|
||||
{"id": 417, "subject": "P2 Task 3: HealthTick — periodic DB Ask/PipeTo + PreStart immediate refresh", "status": "pending", "blockedBy": [416]},
|
||||
{"id": 418, "subject": "P2 Task 4: PeerProbeSupervisor — one peer probe per driver peer", "status": "pending"},
|
||||
{"id": 419, "subject": "P2 Task 5: Wire WithOtOpcUaRuntimeActors (dbHealth ref + spawn supervisor)", "status": "pending", "blockedBy": [417, 418]},
|
||||
{"id": 420, "subject": "P2 Task 6: docs/Redundancy.md — calculator is WIRED", "status": "pending"},
|
||||
{"id": 421, "subject": "P2 Task 7: Full build + test + final integration review", "status": "pending", "blockedBy": [419, 420]},
|
||||
{"id": 422, "subject": "P2 Task 8: Live /run on the 2-node rig (acceptance gate)", "status": "pending", "blockedBy": [421]}
|
||||
],
|
||||
"lastUpdated": "2026-06-15"
|
||||
}
|
||||
Reference in New Issue
Block a user