10 Commits

Author SHA1 Message Date
Joseph Doherty fc52fbce49 docs: follow-ons tasks all complete
v2-ci / build (push) Failing after 43s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
2026-06-07 10:56:27 -04:00
Joseph Doherty f817fc8a8f fix(docker-dev): pin EF/AspNetCore logs to Warning + per-service mem limits to stop OOM/starvation 2026-06-07 10:53:08 -04:00
Joseph Doherty 46aba992c5 fix(config): DraftSnapshotFactory loads only active (unreleased) reservations
Filter ExternalIdReservations to WHERE ReleasedAt IS NULL so
DraftSnapshot.ActiveReservations matches its documented semantics and
ValidateReservationPreflight cannot emit spurious BadDuplicateExternalIdentifier
errors from already-released rows. Adds a focused unit test seeding one active
and one released reservation and asserting only the active row is returned.
2026-06-07 10:47:33 -04:00
Joseph Doherty 1023209d52 feat(deploy): reject Tag/VirtualTag NodeId collisions at deploy (surgical DraftValidator gate) 2026-06-07 10:42:13 -04:00
Joseph Doherty fce66d104a refactor(config): materialise collision groups once; note VirtualTag folder coupling 2026-06-07 10:37:22 -04:00
Joseph Doherty 83c7149be0 feat(config): DraftValidator rule + DraftSnapshot.VirtualTags for Tag/VirtualTag NodeId collisions 2026-06-07 10:33:45 -04:00
Joseph Doherty 6b36eff2d3 refactor(runtime): capture-first in HandleWriteAsync; assert no handler leak on resubscribe; fix stale comment 2026-06-07 10:31:20 -04:00
Joseph Doherty 98259ab026 fix(runtime): capture Sender before await in DriverInstanceActor subscribe (no-ActorContext race) 2026-06-07 10:26:17 -04:00
Joseph Doherty ce8c0811eb docs(plan): OtOpcUa follow-ons implementation plan (subscribe race, collision gate, docker-dev) 2026-06-07 10:20:52 -04:00
Joseph Doherty 4cb488c53e docs(design): OtOpcUa follow-ons — subscribe race, signal-name collision validation, docker-dev resources 2026-06-07 10:06:08 -04:00
14 changed files with 887 additions and 15 deletions
+9
View File
@@ -104,6 +104,15 @@ The `-v` drops the SQL volume; remove it to keep ConfigDb state across restarts.
2. `docker compose -f docker-dev/docker-compose.yml stop central-1``central-2` should pick up the admin role-leader within ~15 s (Akka split-brain stable-after). Traefik will route traffic to `central-2` once its `/health/active` returns 200.
3. `docker compose -f docker-dev/docker-compose.yml start central-1``central-1` rejoins as a follower; `central-2` keeps the leader role until something disturbs it.
## Resource limits & dev logging
The full single-mesh stack (`central-1`/`central-2` + the four site nodes) can OOM-kill `central-1` on a loaded host. Two settings in the compose file guard against that:
- **EF Core + ASP.NET Core logs are pinned to `Warning`** on every host node (`Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore` / `…Microsoft.AspNetCore` = `Warning`). The host logs via Serilog (`AddZbSerilog``ReadFrom.Configuration`), and in `Development` the default level is `Debug` — without these overrides every Deployment-poll emits an `Executed DbCommand` / `SELECT … FROM [Deployment]` line, flooding the Serilog pipeline and starving the Akka cluster heartbeat thread. Application + Akka log levels are left untouched, so this only silences the per-poll SQL chatter. To temporarily restore the SQL log flood for debugging, drop those two env vars (or set them back to `Information`) on the node you're inspecting.
- **Each host node has `mem_limit: 1g`** (`mem_reservation: 512m`). A quiet solo `central-1` measures ~357 MiB; the limit leaves headroom for the deploy/UI load and per-cluster driver subscriptions that push a fully-loaded node higher. The limit/reservation live on the `&otopcua-host` anchor, so all six host services inherit them; `sql`, `traefik`, and the one-shot `migrator`/`cluster-seed` are left unbounded.
The full six-node host stack therefore needs roughly **6 GiB** of Docker Desktop VM memory just for the host nodes (plus SQL Server's own footprint on top). On a constrained host, either raise the Docker Desktop VM memory or run fewer host services (e.g. just `central-1` + `central-2`, or a single central node) rather than the full mesh.
## Notes
- This compose is for the **local Mac/Linux developer rig**. The team's CI + soak runs go to the remote docker host at `10.100.0.35` (see `docs/v2/dev-environment.md`); the file there mirrors this one with adjusted port bindings.
+31
View File
@@ -121,6 +121,16 @@ services:
dockerfile: docker-dev/Dockerfile
target: runtime
image: otopcua-host:dev
# Per-node memory bounds. The full single-mesh stack (6 host nodes) OOM-killed
# central-1 on a loaded host. Each host node measured ~357 MiB idle-solo and
# climbs under the full mesh + deploy/UI load, so cap at 1g (≈peak + headroom)
# with a 512m reservation. These top-level keys are inherited by every service
# that uses `<<: *otopcua-host` (YAML merge keeps the anchor's scalar keys; only
# the `environment` block is re-declared per service). Compose v2 honors
# `mem_limit`/`mem_reservation`. The full mesh needs ~6g of Docker Desktop VM
# memory — on a constrained host raise the VM memory or run fewer host services.
mem_limit: 1g
mem_reservation: 512m
depends_on:
sql: { condition: service_healthy }
migrator: { condition: service_completed_successfully }
@@ -147,6 +157,13 @@ services:
Security__Ldap__ServiceAccountDn: "cn=serviceaccount,dc=zb,dc=local"
Security__Ldap__ServiceAccountPassword: "serviceaccount123"
Security__DeployApiKey: "docker-dev-deploy-key"
# Pin EF Core + ASP.NET Core to Warning so the per-poll Deployment SELECT /
# "Executed DbCommand" Information|Debug lines stop flooding the Serilog
# pipeline and starving the Akka cluster heartbeat thread. The host logs via
# Serilog (AddZbSerilog → ReadFrom.Configuration); these env vars override
# Serilog:MinimumLevel:Override:* (app/Akka levels are left untouched).
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
ports:
- "4840:4840"
@@ -180,6 +197,10 @@ services:
Security__Ldap__ServiceAccountDn: "cn=serviceaccount,dc=zb,dc=local"
Security__Ldap__ServiceAccountPassword: "serviceaccount123"
Security__DeployApiKey: "docker-dev-deploy-key"
# Quiet EF/AspNetCore SQL flood — see central-1 (Serilog override). mem_limit/
# mem_reservation are inherited from the *otopcua-host anchor.
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
ports:
- "4841:4840"
@@ -203,6 +224,10 @@ services:
Cluster__PublicHostname: "site-a-1"
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
Cluster__Roles__0: "driver"
# Quiet EF/AspNetCore SQL flood — see central-1 (Serilog override). mem_limit/
# mem_reservation are inherited from the *otopcua-host anchor.
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
# Resolved at runtime by GalaxyDriver.ResolveApiKey when a DriverInstance's
# Gateway.ApiKeySecretRef = "env:GALAXY_MXGW_API_KEY".
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
@@ -223,6 +248,8 @@ services:
Cluster__PublicHostname: "site-a-2"
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
Cluster__Roles__0: "driver"
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
ports:
- "4843:4840"
@@ -243,6 +270,8 @@ services:
Cluster__PublicHostname: "site-b-1"
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
Cluster__Roles__0: "driver"
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
ports:
- "4844:4840"
@@ -261,6 +290,8 @@ services:
Cluster__PublicHostname: "site-b-2"
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
Cluster__Roles__0: "driver"
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
ports:
- "4845:4840"
@@ -0,0 +1,112 @@
# OtOpcUa follow-ons design (2026-06-07)
Three independent hardening items surfaced during the Equipment-namespace live-values milestone
(`docs/plans/2026-06-07-equipment-namespace-live-values.md`). All live in OtOpcUa. They are
independent and ship as separate commits/tasks.
---
## 1. Fix the `DriverInstanceActor` subscribe race (Runtime — real bug)
**File:** `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs`
**Root cause.** `HandleSubscribeAsync` (dispatched via `ReceiveAsync<Subscribe>`) captures `Sender`/`Self`
on lines 362-363 — *after* the conditional `await UnsubscribeAsync().ConfigureAwait(false)` on line 359.
`ConfigureAwait(false)` opts out of Akka's `ActorTaskScheduler`, so in the re-subscribe path
(`_subscriptionHandle is not null`, which fires on every deploy re-apply and on bootstrap restore) the
post-await `Sender` read (`Sender` = `Context.Sender`) runs on a thread-pool thread with no active actor
context → `System.NotSupportedException: There is no active ActorContext`. It is provoked further when the
actor is stopping mid-await (a Subscribe lands during a deploy/restore reconcile while the driver is being
torn down) — observed live as `GalaxyDriver … shutting down` immediately followed by the error. Recovers on
a clean node restart, which is why a non-raced bootstrap subscribed cleanly.
**Fix (two parts).**
1. **Capture before await.** Move `var replyTo = Sender; var self = Self;` to the very top of
`HandleSubscribeAsync`, before the `_subscriptionHandle is not null` check + its await. Use
`replyTo`/`self` everywhere after; never read raw `Sender`/`Self`/`Context` past any await. This removes
the failing line.
2. **Drop `.ConfigureAwait(false)` inside the `ReceiveAsync` handlers** (`HandleSubscribeAsync`,
`UnsubscribeAsync`, `HandleApplyDeltaAsync`, `HandleWriteAsync`) so continuations resume on the actor's
`ActorTaskScheduler` (idiomatic Akka — preserves actor context, handles the stopping actor gracefully).
Audit each handler for any other post-await `Context`/`Sender`/`Self` access while doing so. (Note:
`HandleApplyDeltaAsync` and `HandleWriteAsync` already capture `replyTo = Sender` before their awaits;
only `HandleSubscribeAsync` has the after-await capture — confirm during implementation.)
**Out of scope.** The `ReceiveAsync` mailbox-blocking semantics (one message at a time during a long
SubscribeAsync) are unchanged — not part of this bug.
**Testing.** A TestKit test that subscribes, then subscribes again (drives the line-359
unsubscribe-then-resubscribe path) and asserts `SubscriptionEstablished` is replied with no exception
(today this throws). Use a fake `ISubscribable` driver. The stopping-mid-await race is hard to make
deterministic; the capture-before-await fix removes the failing access regardless.
---
## 2. Validate Tag↔VirtualTag name collisions at deploy (Configuration)
**Files:** `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftValidator.cs` + the `DraftSnapshot`
type + wherever `DraftSnapshot` is built (locate during planning).
**Problem.** `Phase7Applier` materialises both Equipment `Tag` variables and `VirtualTag` variables at the
folder-scoped NodeId `{EquipmentId}[/{FolderPath}]/{Name}`. `UX_Tag_EquipmentPath` and
`UX_VirtualTag_EquipmentPath` each enforce `(EquipmentId, Name)` uniqueness *within their own table*, but
there is **no cross-table constraint** — a `Tag` and a `VirtualTag` with the same `(EquipmentId, Name)` (and
same/empty FolderPath) would materialise to the same NodeId, and the sink (keyed on NodeId) would keep only
one. Not hit today (the company overlay is VirtualTag-only), but latent for mixed equipment.
**Change.**
- Extend `DraftSnapshot` to carry equipment-bound signal identities: the `(EquipmentId, FolderPath, Name)`
of every `Tag` with a non-null `EquipmentId` and every `VirtualTag` (FolderPath = "" for VirtualTags).
Populate them where `DraftSnapshot` is constructed.
- New rule `DraftValidator.ValidateNoEquipmentSignalNameCollision`: group those signals by the
**folder-scoped key** `(EquipmentId, normalize(FolderPath), Name)` — exactly the materialiser's NodeId key
— and emit a `ValidationError("EquipmentSignalNameCollision", …)` for any group with count > 1 (carry the
EquipmentId + Name in the error). Wire it into `DraftValidator.Validate`.
**Why the folder-scoped key.** A Tag under a sub-folder (`{Eq}/sub/Name`) does not collide with a root
VirtualTag (`{Eq}/Name`) of the same Name. Keying on the full NodeId path avoids false positives. Galaxy /
SystemPlatform Tags (`EquipmentId` null) live in a different node space and are excluded.
**Testing.** Validator unit tests: a Tag + a VirtualTag with the same `(EquipmentId, Name)` (empty
FolderPath) → `EquipmentSignalNameCollision`; same Name but different FolderPath → OK; distinct names → OK;
a SystemPlatform Tag sharing a name with an equipment VirtualTag → OK (excluded).
---
## 3. docker-dev resource hardening (compose/config only — no app code)
**File:** `docker-dev/docker-compose.yml` (+ `docker-dev/README.md`).
**Problem.** The full single-mesh stack (central-1/2 + 4 site nodes) OOM-killed central-1 on a loaded host
(Docker Desktop VM memory cap; `OOMKilled=true`, host load ~12). Two contributors: (a) verbose EF Core
`Development` logging — every Deployment-poll `SELECT` + `Executed DbCommand` logged, flooding the dispatcher
and starving the cluster heartbeat thread (observed `heartbeat … delayed 4169ms … thread starvation`);
(b) no per-container memory limits, so total footprint is unbounded against the VM cap.
**Change.**
- **Quiet logging:** add `Logging__LogLevel__Microsoft.EntityFrameworkCore=Warning` (and
`Logging__LogLevel__Microsoft.AspNetCore=Warning`) to the host services' `environment`. Keep application
logs (`Default`/`ZB.MOM.WW`) at Info. This removes the SQL flood and the starvation cascade.
- **Per-service memory bounds:** add `mem_limit` (and `mem_reservation`) to each host service (central-1/2,
site-a/b), sized by measuring a steady-state node (`docker stats`) and adding headroom. Document in
`docker-dev/README.md` the approximate total Docker Desktop VM memory the full stack needs, so a
constrained host knows to raise the VM or run fewer nodes.
**Coordinate with active docker-dev work.** `master` has recent in-flight docker-dev commits
(single-mesh topology, fresh-volume bootstrap). Rebase/merge carefully so these changes don't clobber that
work; keep the edits additive (env keys + `mem_limit` lines) where possible.
**Testing.** Operational only: bring the full stack up with the changes, confirm no OOM and all nodes serve
(`:9200` healthy, `:4840` listening on central-1, galaxy mirror Good). Not unit-testable.
---
## Sequencing & risk
| # | Area | Risk | Ships as |
|---|---|---|---|
| 1 | Runtime actor (`DriverInstanceActor`) | Medium (actor semantics) — small, well-understood | own commit + TestKit test |
| 2 | Configuration (`DraftSnapshot` + `DraftValidator`) | Lowmedium (new snapshot field + rule) | own commit + validator tests |
| 3 | docker-dev compose/README | Low (config only) | own commit; coordinate with active docker-dev work |
All three are independent and can be reviewed/merged separately. No cross-dependencies.
+286
View File
@@ -0,0 +1,286 @@
# OtOpcUa Follow-ons Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans (or subagent-driven-development) to implement this plan task-by-task.
**Goal:** Ship three independent hardening items from `docs/plans/2026-06-07-otopcua-followons-design.md`: (1) fix the `DriverInstanceActor` subscribe ActorContext race, (2) reject Tag↔VirtualTag NodeId collisions at deploy via a surgical validator gate, (3) make the docker-dev stack survive a constrained host.
**Architecture:** All three live in OtOpcUa, on branch `feat/otopcua-followons` (off `master` `4c221ce`). They are independent (no cross-deps except T3→T2). #1 is a Runtime actor fix; #2 wires the (currently dormant) `DraftValidator` into the deploy handler but gates *only* on the new collision rule (other dormant rules run but don't block — avoids breaking the existing non-canonical company overlay); #3 is docker-dev config only.
**Tech Stack:** .NET 10, Akka.NET (TestKit), EF Core (`OtOpcUaConfigDbContext`), xUnit + Shouldly, Docker Compose.
**Decisions locked (from brainstorming):** #2 = **surgical gate** (reject only on `EquipmentSignalNameCollision`; build the `DraftSnapshot` + wire `DraftValidator.Validate` so it's future-ready, but filter the gate to the one code). #3 = quiet EF/AspNetCore logging + per-service `mem_limit`/`mem_reservation` + README (no lite profile).
**Environment caveat:** docker-dev is being actively reconfigured by the user (DB reset, single-mesh, fresh-volume bootstrap). Do **not** rely on live-DB state for tests — #2's tests are unit/TestKit with synthetic snapshots; #3 is verified by bringing the stack up. Coordinate #3's edits with the in-flight docker-dev commits on master (keep them additive).
---
## Task graph
```
T1 (#1 subscribe race) ┐ independent, disjoint files
T2 (#2 snapshot field + rule) ┤ → T3 (#2 deploy gate)
T4 (#3 docker-dev config) ┘
```
T1, T2, T4 are mutually parallelizable. T3 depends on T2.
---
### Task 1: Fix the `DriverInstanceActor` subscribe ActorContext race
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 2, Task 4
**Files:**
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs` (`HandleSubscribeAsync` ~349-384, `UnsubscribeAsync` ~386-403, `HandleWriteAsync` ~315-345 — drop `ConfigureAwait(false)`)
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs`
**Root cause (recap):** `HandleSubscribeAsync` reads `Sender` (= `Context.Sender`) on line 362 *after* a conditional `await UnsubscribeAsync().ConfigureAwait(false)` on line 359. `ConfigureAwait(false)` resumes the continuation off Akka's `ActorTaskScheduler`, so the re-subscribe path reads `Context.Sender` on a thread-pool thread → `no active ActorContext`.
**Step 1 — Write the failing test.** In `DriverInstanceActorTests.cs` (read it first for the existing fake-`ISubscribable` driver + TestKit harness pattern; reuse the fake driver that drives the actor to the `Connected` state). Add a test that subscribes, then subscribes AGAIN (so `_subscriptionHandle is not null` → drives the line-359 unsubscribe-then-resubscribe path):
```csharp
[Fact]
public void Resubscribe_does_not_throw_no_active_ActorContext()
{
var (actor, _) = CreateConnectedSubscribableDriver(); // mirror the existing helper
actor.Tell(new DriverInstanceActor.Subscribe(new[] { "ref.A" }, TimeSpan.FromSeconds(1)));
ExpectMsg<DriverInstanceActor.SubscriptionEstablished>();
// Second subscribe exercises the await-Unsubscribe-then-read-Sender path that threw.
actor.Tell(new DriverInstanceActor.Subscribe(new[] { "ref.B" }, TimeSpan.FromSeconds(1)));
ExpectMsg<DriverInstanceActor.SubscriptionEstablished>(TimeSpan.FromSeconds(5));
}
```
(Match the real message type names/namespaces from the actor. If the existing tests already have a "subscribe" test, clone its setup.)
**Step 2 — Run, verify it fails.** `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ --filter "FullyQualifiedName~DriverInstanceActor"` — expect the new test to fail (the second subscribe throws `no active ActorContext`, so no `SubscriptionEstablished` arrives → `ExpectMsg` times out).
**Step 3 — Implement.** In `HandleSubscribeAsync`, move the `Sender`/`Self` capture to the **top**, before the `ISubscribable` check and the conditional unsubscribe await; use the captured locals everywhere after; and drop `.ConfigureAwait(false)` from the awaits inside this and the sibling `ReceiveAsync` handlers:
```csharp
private async Task HandleSubscribeAsync(Subscribe msg)
{
var replyTo = Sender; // capture BEFORE any await (Context.Sender is invalid post-await)
var self = Self;
if (_driver is not ISubscribable subscribable)
{
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
return;
}
if (_subscriptionHandle is not null)
{
await UnsubscribeAsync(); // no ConfigureAwait(false) — resume on the actor context
}
try
{
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
subscribable.OnDataChange += _dataChangeHandler;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
_subscriptionHandle = await subscribable
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token); // no ConfigureAwait(false)
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
}
catch (Exception ex)
{
DetachSubscription();
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
replyTo.Tell(new SubscriptionFailed(ex.Message));
}
}
```
Also remove `.ConfigureAwait(false)` from the awaits in `UnsubscribeAsync` (line ~396) and `HandleWriteAsync` (line ~330) so every `ReceiveAsync` continuation resumes on the actor context. (`HandleApplyDeltaAsync` already captures `Sender` first and has no `ConfigureAwait(false)` — leave it.) Audit each handler: no raw `Sender`/`Self`/`Context` access past any await.
**Step 4 — Run, verify it passes.** Same filter → green. Then run the broader Runtime driver slice: `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ --filter "FullyQualifiedName~DriverInstance|FullyQualifiedName~DriverHost"` — no regression.
**Step 5 — Commit.** `git add … && git commit -m "fix(runtime): capture Sender before await in DriverInstanceActor subscribe (no-ActorContext race)"`
---
### Task 2: Add `VirtualTags` to `DraftSnapshot` + the collision rule
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 1, Task 4
**Files:**
- Modify: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftSnapshot.cs` (add `VirtualTags`)
- Modify: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftValidator.cs` (add rule + wire into `Validate`)
- Test: `tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/DraftValidatorTests.cs`
**Step 1 — Write failing tests.** In `DraftValidatorTests.cs` (read it for the snapshot-builder helper the existing tests use — they construct `DraftSnapshot` via object initializer with the entities). Add:
```csharp
[Fact]
public void Tag_and_VirtualTag_same_equipment_and_name_collide()
{
var draft = MakeDraft(/* one Equipment eq-1 */) with-ish initializer:
Tags = [ new Tag { TagId="t1", EquipmentId="eq-1", Name="speed", DataType="Float64", FolderPath=null, /*required fields*/ } ],
VirtualTags = [ new VirtualTag { VirtualTagId="v1", EquipmentId="eq-1", Name="speed", DataType="Float64", ScriptId="s1" } ];
DraftValidator.Validate(draft).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
}
[Fact]
public void Same_name_different_folder_does_not_collide()
{
// Tag under FolderPath "metrics" vs VirtualTag at equipment root → different NodeId → OK
... Tags=[Tag{EquipmentId="eq-1",Name="speed",FolderPath="metrics"}], VirtualTags=[VirtualTag{EquipmentId="eq-1",Name="speed"}]
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
}
[Fact]
public void SystemPlatform_tag_sharing_name_with_equipment_vtag_is_excluded()
{
// Tag with EquipmentId == null (galaxy/SystemPlatform) never collides with an equipment vtag
... Tags=[Tag{EquipmentId=null,Name="speed"}], VirtualTags=[VirtualTag{EquipmentId="eq-1",Name="speed"}]
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
}
```
(Fill in every `required`/NOT-NULL field on `Tag`/`VirtualTag`/`Equipment` — read the entity classes `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Entities/{Tag,VirtualTag,Equipment}.cs`. Mirror how existing tests build entities.)
**Step 2 — Run, verify fail** (compile error: `VirtualTags` not on `DraftSnapshot`; then rule missing).
`dotnet test tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ --filter "FullyQualifiedName~DraftValidator"`
**Step 3 — Implement.**
In `DraftSnapshot.cs`, after the `Tags` member:
```csharp
/// <summary>Equipment-bound VirtualTags (script-derived signals). Used by the NodeId-collision rule.</summary>
public IReadOnlyList<VirtualTag> VirtualTags { get; init; } = [];
```
In `DraftValidator.cs`, add the rule and call it from `Validate`:
```csharp
private static void ValidateNoEquipmentSignalNameCollision(DraftSnapshot draft, List<ValidationError> errors)
{
// The materialiser keys equipment signal variables on the folder-scoped NodeId
// "{EquipmentId}[/{FolderPath}]/{Name}". Tag (EquipmentId != null) and VirtualTag share that
// node space with no DB-level cross-table uniqueness, so the same key from both collides.
static string Key(string eq, string? folder, string name) =>
string.IsNullOrWhiteSpace(folder) ? $"{eq}/{name}" : $"{eq}/{folder}/{name}";
var signals = draft.Tags
.Where(t => t.EquipmentId is not null)
.Select(t => (Key: Key(t.EquipmentId!, t.FolderPath, t.Name), Eq: t.EquipmentId!, t.Name))
.Concat(draft.VirtualTags
.Select(v => (Key: Key(v.EquipmentId, null, v.Name), Eq: v.EquipmentId, v.Name)));
foreach (var g in signals.GroupBy(s => s.Key, StringComparer.Ordinal).Where(g => g.Count() > 1))
{
var f = g.First();
errors.Add(new("EquipmentSignalNameCollision",
$"{g.Count()} signals collide on OPC UA NodeId '{g.Key}' (equipment '{f.Eq}', name '{f.Name}'); " +
"a Name must be unique across Tag and VirtualTag within an equipment+folder",
f.Eq));
}
}
```
Add `ValidateNoEquipmentSignalNameCollision(draft, errors);` to the `Validate` method's rule list (around line 33). Confirm the `VirtualTag` entity exposes `EquipmentId`, `Name` (it does); it has no `FolderPath` (so vtags use `null` folder).
**Step 4 — Run, verify pass.** Same filter → green; then the whole Configuration.Tests suite to confirm no regression.
**Step 5 — Commit.** `git add … && git commit -m "feat(config): DraftValidator rule + DraftSnapshot.VirtualTags for Tag/VirtualTag NodeId collisions"`
---
### Task 3: Build `DraftSnapshot` from the DB + wire the surgical deploy gate
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 2)
**Files:**
- Create: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftSnapshotFactory.cs`
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs` (`HandleStartDeploymentAsync`, before the `ConfigComposer.SnapshotAndFlattenAsync` seal)
- Read first: `AdminOperationsActor.cs` (the `db` in scope + the `StartDeploymentOutcome` enum incl. `Rejected` + the `StartDeploymentResult` shape), and `src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Api/DeployApiEndpoints.cs` (confirm `Rejected` maps to a non-2xx HTTP — if not, map it, e.g. 422).
- Test: `tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/DraftSnapshotFactoryTests.cs` (new) + extend an `AdminOperationsActor` test if one exists (TestKit) to assert a colliding config is Rejected.
**Design:** Build a `DraftSnapshot` from the live config DB and run the FULL `DraftValidator.Validate`, but **gate the deploy only on `EquipmentSignalNameCollision`** — other (dormant, possibly-failing) rules run but do not block. This wires the validator for future activation without breaking the non-canonical company overlay.
**Step 1 — Write failing tests.**
`DraftSnapshotFactoryTests.cs`: using an in-memory `OtOpcUaConfigDbContext` (mirror how other Configuration.Tests build one) seeded with one Equipment + a Tag and a VirtualTag sharing (EquipmentId, Name), assert `DraftSnapshotFactory.FromConfigDbAsync(db)` returns a snapshot whose `Tags` and `VirtualTags` are populated, and `DraftValidator.Validate(snapshot)` contains `EquipmentSignalNameCollision`.
If `AdminOperationsActor` has a TestKit test harness, add: seed a colliding config → `StartDeployment``StartDeploymentResult` outcome is `Rejected` with the collision message; seed a non-colliding config → `Accepted` (sealed).
**Step 2 — Run, verify fail.**
**Step 3 — Implement.**
`DraftSnapshotFactory.cs` — load the tables the rules touch (so they don't NRE), gate-relevant ones fully populated:
```csharp
public static class DraftSnapshotFactory
{
public static async Task<DraftSnapshot> FromConfigDbAsync(OtOpcUaConfigDbContext db, CancellationToken ct = default)
=> new DraftSnapshot
{
GenerationId = 0, // generation model dropped; placeholder (no rule reads it)
ClusterId = string.Empty, // global snapshot; rules compare entity ClusterId fields, not this
Namespaces = await db.Namespaces.AsNoTracking().ToListAsync(ct),
DriverInstances = await db.DriverInstances.AsNoTracking().ToListAsync(ct),
Devices = await db.Devices.AsNoTracking().ToListAsync(ct),
UnsAreas = await db.UnsAreas.AsNoTracking().ToListAsync(ct),
UnsLines = await db.UnsLines.AsNoTracking().ToListAsync(ct),
Equipment = await db.Equipment.AsNoTracking().ToListAsync(ct),
Tags = await db.Tags.AsNoTracking().ToListAsync(ct),
VirtualTags = await db.VirtualTags.AsNoTracking().ToListAsync(ct),
PollGroups = await db.PollGroups.AsNoTracking().ToListAsync(ct),
PriorEquipment = [], // no prior generation in the live model (UUID-immutability rule no-ops)
ActiveReservations= await db.ExternalIdReservations.AsNoTracking().ToListAsync(ct),
// Enterprise/Site left null — only PathLength reads them (under-counts; acceptable, not gated here)
};
}
```
(Confirm the DbSet names: `db.ExternalIdReservations` etc. — read `OtOpcUaConfigDbContext`. Adjust if different.)
In `AdminOperationsActor.HandleStartDeploymentAsync`, immediately before `ConfigComposer.SnapshotAndFlattenAsync(db)`:
```csharp
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
var collisions = DraftValidator.Validate(draft)
.Where(e => e.Code == "EquipmentSignalNameCollision")
.ToList();
if (collisions.Count > 0)
{
var summary = string.Join("; ", collisions.Select(e => e.Message));
_log.Warning("StartDeployment rejected: {Summary}", summary);
replyTo.Tell(new StartDeploymentResult(StartDeploymentOutcome.Rejected, /* id */ null, summary /* match the result shape */));
return;
}
```
(Match the real `StartDeploymentResult` constructor + how `replyTo`/the failure path is shaped at lines ~108-119.) If `DeployApiEndpoints` doesn't already map `Rejected` to a non-2xx, map it to **422 Unprocessable Entity** with the message.
**Step 4 — Run, verify pass.** Configuration.Tests + the AdminOperations slice green. Build the Host project to confirm the endpoint mapping compiles.
**Step 5 — Commit.** `git add … && git commit -m "feat(deploy): reject Tag/VirtualTag NodeId collisions at deploy (surgical DraftValidator gate)"`
---
### Task 4: docker-dev resource hardening (logging + memory + README)
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 1, Task 2
**Files:**
- Modify: `docker-dev/docker-compose.yml` (the host-service env + per-service limits; use the `&otopcua-host` anchor / per-service blocks)
- Modify: `docker-dev/README.md` (memory note)
- Read first: the current compose to see the shared anchor + each host service's `environment` block, and **rebase/coordinate** with the in-flight docker-dev commits on master (keep edits additive).
**Step 1 — Quiet logging.** Add to the host services' `environment` (prefer the `&otopcua-host` anchor so all inherit; if envs are per-service, add to each of central-1/2, site-a-1/2, site-b-1/2):
```yaml
Logging__LogLevel__Microsoft.EntityFrameworkCore: "Warning"
Logging__LogLevel__Microsoft.AspNetCore: "Warning"
```
(Keep `Default`/app loggers at their current level. This removes the per-poll `SELECT FROM Deployment` + `Executed DbCommand` flood that drove the heartbeat thread-starvation.)
**Step 2 — Memory bounds.** Add to each host service (sized with headroom; start from a measured steady-state — see Step 3):
```yaml
mem_limit: 1500m
mem_reservation: 768m
```
(Use Compose v2 top-level `mem_limit`/`mem_reservation` keys, matching whatever resource style the file already uses; if it uses `deploy.resources.limits`, match that instead.)
**Step 3 — Size + verify (operational).** Bring up the full stack: `cd docker-dev && docker compose up -d`. Wait for the cluster to form, then `docker stats --no-stream` to read steady-state RSS per node; set `mem_limit` ≈ peak + ~30% headroom (adjust the Step-2 numbers). Confirm: no `OOMKilled` (`docker inspect … --format '{{.State.OOMKilled}}'` = false on all), `:9200` healthy, `:4840` listening, and the EF SQL flood is gone from `docker compose logs central-1`.
**Step 4 — README.** Add a short note to `docker-dev/README.md`: the full single-mesh stack needs ≈ `<measured total>` of Docker Desktop VM memory; on a constrained host, raise the VM memory or run fewer host services. Mention the EF log level is pinned to Warning in dev.
**Step 5 — Commit.** `git add docker-dev/docker-compose.yml docker-dev/README.md && git commit -m "fix(docker-dev): pin EF/AspNetCore logs to Warning + per-service mem limits to stop OOM/starvation"`
---
## After all tasks
Run the affected test projects (`Runtime.Tests`, `Configuration.Tests`) + build the Host project, then use **superpowers-extended-cc:finishing-a-development-branch**: verify green, then present merge/PR/keep/discard for `feat/otopcua-followons`. Merge/push only on the user's explicit go (the user manages their own integration in this repo — coordinate, given concurrent docker-dev work on master).
@@ -0,0 +1,10 @@
{
"planPath": "docs/plans/2026-06-07-otopcua-followons.md",
"tasks": [
{"id": 1, "subject": "Task 1: Fix DriverInstanceActor subscribe ActorContext race", "status": "completed", "classification": "high-risk", "parallelizableWith": [2, 4]},
{"id": 2, "subject": "Task 2: DraftSnapshot.VirtualTags + EquipmentSignalNameCollision rule", "status": "completed", "classification": "standard", "parallelizableWith": [1, 4]},
{"id": 3, "subject": "Task 3: DraftSnapshotFactory + surgical deploy gate", "status": "completed", "classification": "high-risk", "blockedBy": [2]},
{"id": 4, "subject": "Task 4: docker-dev logging + mem limits + README", "status": "completed", "classification": "standard", "parallelizableWith": [1, 2]}
],
"lastUpdated": "2026-06-07"
}
@@ -39,6 +39,10 @@ public sealed class DraftSnapshot
public IReadOnlyList<Equipment> Equipment { get; init; } = [];
/// <summary>Gets the list of tags.</summary>
public IReadOnlyList<Tag> Tags { get; init; } = [];
/// <summary>Equipment-bound VirtualTags (script-derived signals). Shares the equipment NodeId space
/// with Tags; the collision rule checks both.</summary>
public IReadOnlyList<VirtualTag> VirtualTags { get; init; } = [];
/// <summary>Gets the list of poll groups.</summary>
public IReadOnlyList<PollGroup> PollGroups { get; init; } = [];
@@ -0,0 +1,51 @@
using Microsoft.EntityFrameworkCore;
namespace ZB.MOM.WW.OtOpcUa.Configuration.Validation;
/// <summary>
/// Materialises a <see cref="DraftSnapshot"/> from the live config DB so
/// <see cref="DraftValidator"/> can run against the current edit state at deploy time.
/// </summary>
/// <remarks>
/// <para>
/// This is a whole-DB ("global") snapshot — every cluster's rows in one pass — which is
/// what the deploy path needs: the admin-operations actor snapshots and flattens the
/// entire config, not one cluster. The validator's rules compare entity-level
/// <c>ClusterId</c> fields against each other (e.g. namespace binding), so the snapshot's
/// own <see cref="DraftSnapshot.ClusterId"/> is not read by any rule and is left empty.
/// </para>
/// <para>
/// <see cref="DraftSnapshot.GenerationId"/> is a placeholder (the generation model was
/// dropped); no rule reads it. <see cref="DraftSnapshot.PriorEquipment"/> is empty because
/// there is no prior-generation table to diff against. <see cref="DraftSnapshot.Enterprise"/>
/// / <see cref="DraftSnapshot.Site"/> are left null so the path-length rule uses its
/// conservative upper bound.
/// </para>
/// </remarks>
public static class DraftSnapshotFactory
{
/// <summary>Builds a <see cref="DraftSnapshot"/> from the current config DB rows.</summary>
/// <param name="db">The config DB context to read from.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>A snapshot populated from the live DB, ready for <see cref="DraftValidator.Validate"/>.</returns>
public static async Task<DraftSnapshot> FromConfigDbAsync(OtOpcUaConfigDbContext db, CancellationToken ct = default)
=> new DraftSnapshot
{
GenerationId = 0, // generation model dropped; placeholder (no rule reads it)
ClusterId = string.Empty, // global snapshot; rules compare entity ClusterId fields, not this
Namespaces = await db.Namespaces.AsNoTracking().ToListAsync(ct),
DriverInstances = await db.DriverInstances.AsNoTracking().ToListAsync(ct),
Devices = await db.Devices.AsNoTracking().ToListAsync(ct),
UnsAreas = await db.UnsAreas.AsNoTracking().ToListAsync(ct),
UnsLines = await db.UnsLines.AsNoTracking().ToListAsync(ct),
Equipment = await db.Equipment.AsNoTracking().ToListAsync(ct),
Tags = await db.Tags.AsNoTracking().ToListAsync(ct),
VirtualTags = await db.VirtualTags.AsNoTracking().ToListAsync(ct),
PollGroups = await db.PollGroups.AsNoTracking().ToListAsync(ct),
PriorEquipment = [],
ActiveReservations = await db.ExternalIdReservations
.AsNoTracking()
.Where(r => r.ReleasedAt == null) // active only — matches DraftSnapshot.ActiveReservations semantics
.ToListAsync(ct),
};
}
@@ -32,10 +32,37 @@ public static class DraftValidator
ValidateReservationPreflight(draft, errors);
ValidateEquipmentIdDerivation(draft, errors);
ValidateDriverNamespaceCompatibility(draft, errors);
ValidateNoEquipmentSignalNameCollision(draft, errors);
return errors;
}
private static void ValidateNoEquipmentSignalNameCollision(DraftSnapshot draft, List<ValidationError> errors)
{
// Materialiser NodeId key: "{EquipmentId}[/{FolderPath}]/{Name}". Tag (EquipmentId != null) and
// VirtualTag share this space with no DB cross-table uniqueness, so the same key from both collides.
static string Key(string eq, string? folder, string name) =>
string.IsNullOrWhiteSpace(folder) ? $"{eq}/{name}" : $"{eq}/{folder}/{name}";
var signals = draft.Tags
.Where(t => t.EquipmentId is not null)
.Select(t => (Key: Key(t.EquipmentId!, t.FolderPath, t.Name), Eq: t.EquipmentId!, t.Name))
// VirtualTag has no FolderPath column today — null is correct here; update if it ever gains one.
.Concat(draft.VirtualTags
.Select(v => (Key: Key(v.EquipmentId, null, v.Name), Eq: v.EquipmentId, v.Name)));
foreach (var g in signals.GroupBy(s => s.Key, StringComparer.Ordinal))
{
var items = g.ToList();
if (items.Count <= 1) continue;
var f = items[0];
errors.Add(new("EquipmentSignalNameCollision",
$"{items.Count} signals collide on OPC UA NodeId '{g.Key}' (equipment '{f.Eq}', name '{f.Name}'); " +
"a Name must be unique across Tag and VirtualTag within an equipment+folder",
f.Eq));
}
}
private static bool IsValidSegment(string? s) =>
s is not null && (UnsSegment.IsMatch(s) || s == UnsDefaultSegment);
@@ -8,6 +8,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
@@ -78,6 +79,27 @@ public sealed class AdminOperationsActor : ReceiveActor
return;
}
// Surgical pre-seal gate: reject only on a Tag↔VirtualTag NodeId collision. The other
// DraftValidator rules still run (one pass) but must NOT block here — they are dormant
// and the current non-canonical company overlay would otherwise fail them. Filter to the
// single collision code so a real OPC UA address-space clash can never be deployed.
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
var collisions = DraftValidator.Validate(draft)
.Where(e => e.Code == "EquipmentSignalNameCollision")
.ToList();
if (collisions.Count > 0)
{
var summary = string.Join("; ", collisions.Select(e => e.Message));
_log.Warning("StartDeployment rejected (signal collision): {Summary}", summary);
replyTo.Tell(new StartDeploymentResult(
StartDeploymentOutcome.Rejected,
DeploymentId: null,
RevisionHash: null,
Message: summary,
msg.CorrelationId));
return;
}
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
var deploymentId = DeploymentId.NewId();
var revHash = RevisionHash.Parse(artifact.RevisionHash);
@@ -88,9 +88,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
/// </summary>
private readonly Queue<DateTime> _faultTimestamps = new();
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
/// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113).</summary>
/// <summary>Active subscription handle (null when not subscribed). Tracks the current live
/// subscription; the actor auto-(re)subscribes on (re)connect and on each <see cref="Subscribe"/>
/// message via <see cref="ResubscribeDesired"/> / <see cref="HandleSubscribeAsync"/>, so callers
/// do not need to re-send subscription requests after a reconnect.</summary>
private ISubscriptionHandle? _subscriptionHandle;
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
@@ -314,20 +315,20 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private async Task HandleWriteAsync(WriteAttribute msg)
{
var replyTo = Sender;
if (_driver is not IWritable writable)
{
Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
replyTo.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
return;
}
var replyTo = Sender;
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
// retry off by default, but a stalled call still needs an answer.
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false);
var results = await writable.WriteAsync(request, cts.Token);
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
{
replyTo.Tell(new WriteAttributeResult(true, null));
@@ -348,19 +349,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private async Task HandleSubscribeAsync(Subscribe msg)
{
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits
// UnsubscribeAsync, and a real async backend can resume the continuation off Akka's
// ActorContext — reading raw Sender/Self/Context past that point throws
// NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits
// in this handler so continuations resume on the actor context.
var replyTo = Sender;
var self = Self;
if (_driver is not ISubscribable subscribable)
{
Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
return;
}
if (_subscriptionHandle is not null)
{
// Subscribe-twice — drop the prior subscription before establishing the new one.
await UnsubscribeAsync().ConfigureAwait(false);
await UnsubscribeAsync();
}
var replyTo = Sender;
var self = Self;
try
{
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
@@ -368,8 +374,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
_subscriptionHandle = await subscribable
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token)
.ConfigureAwait(false);
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token);
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
@@ -393,7 +398,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false);
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token);
}
catch (Exception ex)
{
@@ -0,0 +1,137 @@
using Microsoft.EntityFrameworkCore;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests;
/// <summary>
/// Verifies <see cref="DraftSnapshotFactory.FromConfigDbAsync"/> materialises a
/// <see cref="DraftSnapshot"/> from the live config DB whose Tag/VirtualTag rows feed the
/// equipment-signal collision rule — the one rule wired into the deploy gate (Task 3).
/// </summary>
[Trait("Category", "Unit")]
public sealed class DraftSnapshotFactoryTests : IDisposable
{
private readonly OtOpcUaConfigDbContext _db;
/// <summary>Initializes a new instance with an isolated in-memory config DB.</summary>
public DraftSnapshotFactoryTests()
{
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
.UseInMemoryDatabase($"draft-snapshot-{Guid.NewGuid():N}")
.Options;
_db = new OtOpcUaConfigDbContext(options);
}
/// <summary>Disposes the database context.</summary>
public void Dispose() => _db.Dispose();
/// <summary>Seeds one Equipment plus a Tag and a VirtualTag sharing (EquipmentId, Name); the
/// snapshot must carry both signal collections AND the validator must flag the collision.</summary>
[Fact]
public async Task FromConfigDb_populates_Tags_and_VirtualTags_and_surfaces_collision()
{
SeedEquipment("eq-1");
_db.Tags.Add(BuildTag(equipmentId: "eq-1", name: "speed"));
_db.VirtualTags.Add(BuildVirtualTag(equipmentId: "eq-1", name: "speed"));
await _db.SaveChangesAsync();
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
snapshot.Tags.Count.ShouldBe(1);
snapshot.VirtualTags.Count.ShouldBe(1);
DraftValidator.Validate(snapshot).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
}
/// <summary>A Tag and a VirtualTag with distinct names under the same equipment do not collide,
/// so the snapshot validates clean of the collision code.</summary>
[Fact]
public async Task FromConfigDb_no_collision_when_names_differ()
{
SeedEquipment("eq-1");
_db.Tags.Add(BuildTag(equipmentId: "eq-1", name: "speed"));
_db.VirtualTags.Add(BuildVirtualTag(equipmentId: "eq-1", name: "temperature"));
await _db.SaveChangesAsync();
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
snapshot.Tags.Count.ShouldBe(1);
snapshot.VirtualTags.Count.ShouldBe(1);
DraftValidator.Validate(snapshot).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
}
/// <summary>Seeds one active and one released reservation for the same equipment context;
/// only the active row (ReleasedAt == null) should appear in the snapshot.</summary>
[Fact]
public async Task FromConfigDb_ActiveReservations_excludes_released_rows()
{
var equipmentUuid = Guid.NewGuid();
_db.ExternalIdReservations.Add(new ExternalIdReservation
{
ReservationId = Guid.NewGuid(),
Kind = ReservationKind.ZTag,
Value = "ZT-001",
EquipmentUuid = equipmentUuid,
ClusterId = "cluster-a",
FirstPublishedBy = "test",
ReleasedAt = null, // active
});
_db.ExternalIdReservations.Add(new ExternalIdReservation
{
ReservationId = Guid.NewGuid(),
Kind = ReservationKind.ZTag,
Value = "ZT-002",
EquipmentUuid = equipmentUuid,
ClusterId = "cluster-a",
FirstPublishedBy = "test",
ReleasedAt = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc), // released
ReleasedBy = "test",
ReleaseReason = "retired",
});
await _db.SaveChangesAsync();
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
snapshot.ActiveReservations.Count.ShouldBe(1);
snapshot.ActiveReservations[0].Value.ShouldBe("ZT-001");
snapshot.ActiveReservations[0].ReleasedAt.ShouldBeNull();
}
private void SeedEquipment(string equipmentId)
{
var uuid = Guid.NewGuid();
_db.Equipment.Add(new Equipment
{
EquipmentUuid = uuid,
EquipmentId = equipmentId,
Name = "eq",
DriverInstanceId = "d",
UnsLineId = "line-a",
MachineCode = "m",
});
}
private static Tag BuildTag(string equipmentId, string name) => new()
{
TagId = $"tag-{name}",
DriverInstanceId = "d",
EquipmentId = equipmentId,
Name = name,
DataType = "Float",
AccessLevel = TagAccessLevel.Read,
TagConfig = "{}",
};
private static VirtualTag BuildVirtualTag(string equipmentId, string name) => new()
{
VirtualTagId = $"vtag-{name}",
EquipmentId = equipmentId,
Name = name,
DataType = "Float",
ScriptId = "s-1",
};
}
@@ -186,6 +186,76 @@ public sealed class DraftValidatorTests
errors.ShouldContain(e => e.Code == "UnsSegmentInvalid");
}
// ------------------------------------------------------------------------------------
// ValidateNoEquipmentSignalNameCollision — Tag/VirtualTag NodeId collision
// ------------------------------------------------------------------------------------
/// <summary>Verifies that an equipment-bound Tag and a VirtualTag sharing the same
/// equipment+name collide on a single OPC UA NodeId.</summary>
[Fact]
public void Tag_and_VirtualTag_same_equipment_and_name_collide()
{
var draft = new DraftSnapshot
{
GenerationId = 1, ClusterId = "c",
Tags = [BuildTag(equipmentId: "eq-1", name: "speed", folderPath: null)],
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
};
DraftValidator.Validate(draft).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
}
/// <summary>Verifies that a Tag in a sub-folder does not collide with a VirtualTag at the
/// equipment root even when the names match — the NodeId keys differ.</summary>
[Fact]
public void Same_name_different_folder_does_not_collide()
{
var draft = new DraftSnapshot
{
GenerationId = 1, ClusterId = "c",
Tags = [BuildTag(equipmentId: "eq-1", name: "speed", folderPath: "metrics")],
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
};
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
}
/// <summary>Verifies that a SystemPlatform Tag (EquipmentId == null) is excluded from the
/// equipment-signal node space and so never collides with an equipment VirtualTag.</summary>
[Fact]
public void SystemPlatform_tag_sharing_name_with_equipment_vtag_excluded()
{
var draft = new DraftSnapshot
{
GenerationId = 1, ClusterId = "c",
Tags = [BuildTag(equipmentId: null, name: "speed", folderPath: null)],
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
};
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
}
private static Tag BuildTag(string? equipmentId, string name, string? folderPath) => new()
{
TagId = $"tag-{name}",
DriverInstanceId = "d",
EquipmentId = equipmentId,
Name = name,
FolderPath = folderPath,
DataType = "Float",
AccessLevel = TagAccessLevel.Read,
TagConfig = "{}",
};
private static VirtualTag BuildVirtualTag(string equipmentId, string name) => new()
{
VirtualTagId = $"vtag-{name}",
EquipmentId = equipmentId,
Name = name,
DataType = "Float",
ScriptId = "s-1",
};
// ------------------------------------------------------------------------------------
// Phase 6.3 task #148 part 2 — ValidateClusterTopology
// ------------------------------------------------------------------------------------
@@ -43,6 +43,60 @@ public sealed class AdminOperationsActorTests : ControlPlaneActorTestBase
db.ConfigEdits.Single().EntityType.ShouldBe("Deployment");
}
/// <summary>Verifies the surgical DraftValidator gate: a Tag↔VirtualTag NodeId collision in
/// the live config rejects the deploy (422-mapped <see cref="StartDeploymentOutcome.Rejected"/>)
/// before any coordinator dispatch — and inserts no Deployment row.</summary>
[Fact]
public void StartDeployment_rejects_on_Tag_VirtualTag_NodeId_collision()
{
var dbFactory = NewInMemoryDbFactory();
using (var db = dbFactory.CreateDbContext())
{
db.Equipment.Add(new Configuration.Entities.Equipment
{
EquipmentUuid = Guid.NewGuid(),
EquipmentId = "eq-1",
Name = "eq",
DriverInstanceId = "d",
UnsLineId = "line-a",
MachineCode = "m",
});
db.Tags.Add(new Configuration.Entities.Tag
{
TagId = "tag-speed",
DriverInstanceId = "d",
EquipmentId = "eq-1",
Name = "speed",
DataType = "Float",
AccessLevel = TagAccessLevel.Read,
TagConfig = "{}",
});
db.VirtualTags.Add(new Configuration.Entities.VirtualTag
{
VirtualTagId = "vtag-speed",
EquipmentId = "eq-1",
Name = "speed",
DataType = "Float",
ScriptId = "s-1",
});
db.SaveChanges();
}
var coordinator = CreateTestProbe("coord");
var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref, Enumerable.Empty<IDriverProbe>()));
actor.Tell(new StartDeployment("joe", CorrelationId.NewId()));
coordinator.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
var reply = ExpectMsg<StartDeploymentResult>(TimeSpan.FromSeconds(3));
reply.Outcome.ShouldBe(StartDeploymentOutcome.Rejected);
reply.Message.ShouldNotBeNull();
reply.Message.ShouldContain("collide"); // the rule's message text
using var verify = dbFactory.CreateDbContext();
verify.Deployments.Count().ShouldBe(0);
}
/// <summary>Verifies that starting a deployment is refused when another is in flight.</summary>
[Fact]
public void StartDeployment_refuses_when_another_is_in_flight()
@@ -182,6 +182,43 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
}
/// <summary>
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
/// bootstrap-restore path where `_subscriptionHandle is not null`.
/// </summary>
[Fact]
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
{
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
var parent = CreateTestProbe();
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// First subscribe — establishes the handle.
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
TimeSpan.FromSeconds(3));
reply.ReferenceCount.ShouldBe(2);
driver.SubscribeCount.ShouldBe(2);
// Old handler must have been detached before the new one was attached — no leak.
driver.OnDataChangeSubscriberCount.ShouldBe(1);
}
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
[Fact]
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
@@ -301,6 +338,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
public IReadOnlyList<string>? LastSubscribedRefs;
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (`await Task.Yield()`)
/// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
public bool UnsubscribeYields { get; set; }
/// <summary>Subscribes to the specified full references.</summary>
/// <param name="fullReferences">The full references to subscribe to.</param>
/// <param name="publishingInterval">The publishing interval.</param>
@@ -316,8 +359,19 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
/// <summary>Unsubscribes from the specified subscription handle.</summary>
/// <param name="handle">The subscription handle.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
=> Task.CompletedTask;
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (UnsubscribeYields)
{
// Complete the awaited task from a fresh background thread that has NO Akka actor
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
// a deterministic repro of the real async-backend no-ActorContext race.
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_ = Task.Run(() => tcs.SetResult());
await tcs.Task.ConfigureAwait(false);
}
}
/// <summary>Fires a data change event with the specified parameters.</summary>
/// <param name="fullRef">The full reference of the data that changed.</param>