Compare commits
10 Commits
4c221ce2b3
...
fc52fbce49
| Author | SHA1 | Date | |
|---|---|---|---|
| fc52fbce49 | |||
| f817fc8a8f | |||
| 46aba992c5 | |||
| 1023209d52 | |||
| fce66d104a | |||
| 83c7149be0 | |||
| 6b36eff2d3 | |||
| 98259ab026 | |||
| ce8c0811eb | |||
| 4cb488c53e |
@@ -104,6 +104,15 @@ The `-v` drops the SQL volume; remove it to keep ConfigDb state across restarts.
|
||||
2. `docker compose -f docker-dev/docker-compose.yml stop central-1` — `central-2` should pick up the admin role-leader within ~15 s (Akka split-brain stable-after). Traefik will route traffic to `central-2` once its `/health/active` returns 200.
|
||||
3. `docker compose -f docker-dev/docker-compose.yml start central-1` — `central-1` rejoins as a follower; `central-2` keeps the leader role until something disturbs it.
|
||||
|
||||
## Resource limits & dev logging
|
||||
|
||||
The full single-mesh stack (`central-1`/`central-2` + the four site nodes) can OOM-kill `central-1` on a loaded host. Two settings in the compose file guard against that:
|
||||
|
||||
- **EF Core + ASP.NET Core logs are pinned to `Warning`** on every host node (`Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore` / `…Microsoft.AspNetCore` = `Warning`). The host logs via Serilog (`AddZbSerilog` → `ReadFrom.Configuration`), and in `Development` the default level is `Debug` — without these overrides every Deployment-poll emits an `Executed DbCommand` / `SELECT … FROM [Deployment]` line, flooding the Serilog pipeline and starving the Akka cluster heartbeat thread. Application + Akka log levels are left untouched, so this only silences the per-poll SQL chatter. To temporarily restore the SQL log flood for debugging, drop those two env vars (or set them back to `Information`) on the node you're inspecting.
|
||||
- **Each host node has `mem_limit: 1g`** (`mem_reservation: 512m`). A quiet solo `central-1` measures ~357 MiB; the limit leaves headroom for the deploy/UI load and per-cluster driver subscriptions that push a fully-loaded node higher. The limit/reservation live on the `&otopcua-host` anchor, so all six host services inherit them; `sql`, `traefik`, and the one-shot `migrator`/`cluster-seed` are left unbounded.
|
||||
|
||||
The full six-node host stack therefore needs roughly **6 GiB** of Docker Desktop VM memory just for the host nodes (plus SQL Server's own footprint on top). On a constrained host, either raise the Docker Desktop VM memory or run fewer host services (e.g. just `central-1` + `central-2`, or a single central node) rather than the full mesh.
|
||||
|
||||
## Notes
|
||||
|
||||
- This compose is for the **local Mac/Linux developer rig**. The team's CI + soak runs go to the remote docker host at `10.100.0.35` (see `docs/v2/dev-environment.md`); the file there mirrors this one with adjusted port bindings.
|
||||
|
||||
@@ -121,6 +121,16 @@ services:
|
||||
dockerfile: docker-dev/Dockerfile
|
||||
target: runtime
|
||||
image: otopcua-host:dev
|
||||
# Per-node memory bounds. The full single-mesh stack (6 host nodes) OOM-killed
|
||||
# central-1 on a loaded host. Each host node measured ~357 MiB idle-solo and
|
||||
# climbs under the full mesh + deploy/UI load, so cap at 1g (≈peak + headroom)
|
||||
# with a 512m reservation. These top-level keys are inherited by every service
|
||||
# that uses `<<: *otopcua-host` (YAML merge keeps the anchor's scalar keys; only
|
||||
# the `environment` block is re-declared per service). Compose v2 honors
|
||||
# `mem_limit`/`mem_reservation`. The full mesh needs ~6g of Docker Desktop VM
|
||||
# memory — on a constrained host raise the VM memory or run fewer host services.
|
||||
mem_limit: 1g
|
||||
mem_reservation: 512m
|
||||
depends_on:
|
||||
sql: { condition: service_healthy }
|
||||
migrator: { condition: service_completed_successfully }
|
||||
@@ -147,6 +157,13 @@ services:
|
||||
Security__Ldap__ServiceAccountDn: "cn=serviceaccount,dc=zb,dc=local"
|
||||
Security__Ldap__ServiceAccountPassword: "serviceaccount123"
|
||||
Security__DeployApiKey: "docker-dev-deploy-key"
|
||||
# Pin EF Core + ASP.NET Core to Warning so the per-poll Deployment SELECT /
|
||||
# "Executed DbCommand" Information|Debug lines stop flooding the Serilog
|
||||
# pipeline and starving the Akka cluster heartbeat thread. The host logs via
|
||||
# Serilog (AddZbSerilog → ReadFrom.Configuration); these env vars override
|
||||
# Serilog:MinimumLevel:Override:* (app/Akka levels are left untouched).
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
ports:
|
||||
- "4840:4840"
|
||||
@@ -180,6 +197,10 @@ services:
|
||||
Security__Ldap__ServiceAccountDn: "cn=serviceaccount,dc=zb,dc=local"
|
||||
Security__Ldap__ServiceAccountPassword: "serviceaccount123"
|
||||
Security__DeployApiKey: "docker-dev-deploy-key"
|
||||
# Quiet EF/AspNetCore SQL flood — see central-1 (Serilog override). mem_limit/
|
||||
# mem_reservation are inherited from the *otopcua-host anchor.
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
ports:
|
||||
- "4841:4840"
|
||||
@@ -203,6 +224,10 @@ services:
|
||||
Cluster__PublicHostname: "site-a-1"
|
||||
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
|
||||
Cluster__Roles__0: "driver"
|
||||
# Quiet EF/AspNetCore SQL flood — see central-1 (Serilog override). mem_limit/
|
||||
# mem_reservation are inherited from the *otopcua-host anchor.
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
# Resolved at runtime by GalaxyDriver.ResolveApiKey when a DriverInstance's
|
||||
# Gateway.ApiKeySecretRef = "env:GALAXY_MXGW_API_KEY".
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
@@ -223,6 +248,8 @@ services:
|
||||
Cluster__PublicHostname: "site-a-2"
|
||||
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
|
||||
Cluster__Roles__0: "driver"
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
ports:
|
||||
- "4843:4840"
|
||||
@@ -243,6 +270,8 @@ services:
|
||||
Cluster__PublicHostname: "site-b-1"
|
||||
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
|
||||
Cluster__Roles__0: "driver"
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
ports:
|
||||
- "4844:4840"
|
||||
@@ -261,6 +290,8 @@ services:
|
||||
Cluster__PublicHostname: "site-b-2"
|
||||
Cluster__SeedNodes__0: "akka.tcp://otopcua@central-1:4053"
|
||||
Cluster__Roles__0: "driver"
|
||||
Serilog__MinimumLevel__Override__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Serilog__MinimumLevel__Override__Microsoft.AspNetCore: "Warning"
|
||||
GALAXY_MXGW_API_KEY: "${GALAXY_MXGW_API_KEY:-mxgw_otopcua2_GI7-tNozYE6cXGUSgEzL3AHDV7bYcYIHdMwKYgyHdX4}"
|
||||
ports:
|
||||
- "4845:4840"
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
# OtOpcUa follow-ons design (2026-06-07)
|
||||
|
||||
Three independent hardening items surfaced during the Equipment-namespace live-values milestone
|
||||
(`docs/plans/2026-06-07-equipment-namespace-live-values.md`). All live in OtOpcUa. They are
|
||||
independent and ship as separate commits/tasks.
|
||||
|
||||
---
|
||||
|
||||
## 1. Fix the `DriverInstanceActor` subscribe race (Runtime — real bug)
|
||||
|
||||
**File:** `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs`
|
||||
|
||||
**Root cause.** `HandleSubscribeAsync` (dispatched via `ReceiveAsync<Subscribe>`) captures `Sender`/`Self`
|
||||
on lines 362-363 — *after* the conditional `await UnsubscribeAsync().ConfigureAwait(false)` on line 359.
|
||||
`ConfigureAwait(false)` opts out of Akka's `ActorTaskScheduler`, so in the re-subscribe path
|
||||
(`_subscriptionHandle is not null`, which fires on every deploy re-apply and on bootstrap restore) the
|
||||
post-await `Sender` read (`Sender` = `Context.Sender`) runs on a thread-pool thread with no active actor
|
||||
context → `System.NotSupportedException: There is no active ActorContext`. It is provoked further when the
|
||||
actor is stopping mid-await (a Subscribe lands during a deploy/restore reconcile while the driver is being
|
||||
torn down) — observed live as `GalaxyDriver … shutting down` immediately followed by the error. Recovers on
|
||||
a clean node restart, which is why a non-raced bootstrap subscribed cleanly.
|
||||
|
||||
**Fix (two parts).**
|
||||
1. **Capture before await.** Move `var replyTo = Sender; var self = Self;` to the very top of
|
||||
`HandleSubscribeAsync`, before the `_subscriptionHandle is not null` check + its await. Use
|
||||
`replyTo`/`self` everywhere after; never read raw `Sender`/`Self`/`Context` past any await. This removes
|
||||
the failing line.
|
||||
2. **Drop `.ConfigureAwait(false)` inside the `ReceiveAsync` handlers** (`HandleSubscribeAsync`,
|
||||
`UnsubscribeAsync`, `HandleApplyDeltaAsync`, `HandleWriteAsync`) so continuations resume on the actor's
|
||||
`ActorTaskScheduler` (idiomatic Akka — preserves actor context, handles the stopping actor gracefully).
|
||||
Audit each handler for any other post-await `Context`/`Sender`/`Self` access while doing so. (Note:
|
||||
`HandleApplyDeltaAsync` and `HandleWriteAsync` already capture `replyTo = Sender` before their awaits;
|
||||
only `HandleSubscribeAsync` has the after-await capture — confirm during implementation.)
|
||||
|
||||
**Out of scope.** The `ReceiveAsync` mailbox-blocking semantics (one message at a time during a long
|
||||
SubscribeAsync) are unchanged — not part of this bug.
|
||||
|
||||
**Testing.** A TestKit test that subscribes, then subscribes again (drives the line-359
|
||||
unsubscribe-then-resubscribe path) and asserts `SubscriptionEstablished` is replied with no exception
|
||||
(today this throws). Use a fake `ISubscribable` driver. The stopping-mid-await race is hard to make
|
||||
deterministic; the capture-before-await fix removes the failing access regardless.
|
||||
|
||||
---
|
||||
|
||||
## 2. Validate Tag↔VirtualTag name collisions at deploy (Configuration)
|
||||
|
||||
**Files:** `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftValidator.cs` + the `DraftSnapshot`
|
||||
type + wherever `DraftSnapshot` is built (locate during planning).
|
||||
|
||||
**Problem.** `Phase7Applier` materialises both Equipment `Tag` variables and `VirtualTag` variables at the
|
||||
folder-scoped NodeId `{EquipmentId}[/{FolderPath}]/{Name}`. `UX_Tag_EquipmentPath` and
|
||||
`UX_VirtualTag_EquipmentPath` each enforce `(EquipmentId, Name)` uniqueness *within their own table*, but
|
||||
there is **no cross-table constraint** — a `Tag` and a `VirtualTag` with the same `(EquipmentId, Name)` (and
|
||||
same/empty FolderPath) would materialise to the same NodeId, and the sink (keyed on NodeId) would keep only
|
||||
one. Not hit today (the company overlay is VirtualTag-only), but latent for mixed equipment.
|
||||
|
||||
**Change.**
|
||||
- Extend `DraftSnapshot` to carry equipment-bound signal identities: the `(EquipmentId, FolderPath, Name)`
|
||||
of every `Tag` with a non-null `EquipmentId` and every `VirtualTag` (FolderPath = "" for VirtualTags).
|
||||
Populate them where `DraftSnapshot` is constructed.
|
||||
- New rule `DraftValidator.ValidateNoEquipmentSignalNameCollision`: group those signals by the
|
||||
**folder-scoped key** `(EquipmentId, normalize(FolderPath), Name)` — exactly the materialiser's NodeId key
|
||||
— and emit a `ValidationError("EquipmentSignalNameCollision", …)` for any group with count > 1 (carry the
|
||||
EquipmentId + Name in the error). Wire it into `DraftValidator.Validate`.
|
||||
|
||||
**Why the folder-scoped key.** A Tag under a sub-folder (`{Eq}/sub/Name`) does not collide with a root
|
||||
VirtualTag (`{Eq}/Name`) of the same Name. Keying on the full NodeId path avoids false positives. Galaxy /
|
||||
SystemPlatform Tags (`EquipmentId` null) live in a different node space and are excluded.
|
||||
|
||||
**Testing.** Validator unit tests: a Tag + a VirtualTag with the same `(EquipmentId, Name)` (empty
|
||||
FolderPath) → `EquipmentSignalNameCollision`; same Name but different FolderPath → OK; distinct names → OK;
|
||||
a SystemPlatform Tag sharing a name with an equipment VirtualTag → OK (excluded).
|
||||
|
||||
---
|
||||
|
||||
## 3. docker-dev resource hardening (compose/config only — no app code)
|
||||
|
||||
**File:** `docker-dev/docker-compose.yml` (+ `docker-dev/README.md`).
|
||||
|
||||
**Problem.** The full single-mesh stack (central-1/2 + 4 site nodes) OOM-killed central-1 on a loaded host
|
||||
(Docker Desktop VM memory cap; `OOMKilled=true`, host load ~12). Two contributors: (a) verbose EF Core
|
||||
`Development` logging — every Deployment-poll `SELECT` + `Executed DbCommand` logged, flooding the dispatcher
|
||||
and starving the cluster heartbeat thread (observed `heartbeat … delayed 4169ms … thread starvation`);
|
||||
(b) no per-container memory limits, so total footprint is unbounded against the VM cap.
|
||||
|
||||
**Change.**
|
||||
- **Quiet logging:** add `Logging__LogLevel__Microsoft.EntityFrameworkCore=Warning` (and
|
||||
`Logging__LogLevel__Microsoft.AspNetCore=Warning`) to the host services' `environment`. Keep application
|
||||
logs (`Default`/`ZB.MOM.WW`) at Info. This removes the SQL flood and the starvation cascade.
|
||||
- **Per-service memory bounds:** add `mem_limit` (and `mem_reservation`) to each host service (central-1/2,
|
||||
site-a/b), sized by measuring a steady-state node (`docker stats`) and adding headroom. Document in
|
||||
`docker-dev/README.md` the approximate total Docker Desktop VM memory the full stack needs, so a
|
||||
constrained host knows to raise the VM or run fewer nodes.
|
||||
|
||||
**Coordinate with active docker-dev work.** `master` has recent in-flight docker-dev commits
|
||||
(single-mesh topology, fresh-volume bootstrap). Rebase/merge carefully so these changes don't clobber that
|
||||
work; keep the edits additive (env keys + `mem_limit` lines) where possible.
|
||||
|
||||
**Testing.** Operational only: bring the full stack up with the changes, confirm no OOM and all nodes serve
|
||||
(`:9200` healthy, `:4840` listening on central-1, galaxy mirror Good). Not unit-testable.
|
||||
|
||||
---
|
||||
|
||||
## Sequencing & risk
|
||||
|
||||
| # | Area | Risk | Ships as |
|
||||
|---|---|---|---|
|
||||
| 1 | Runtime actor (`DriverInstanceActor`) | Medium (actor semantics) — small, well-understood | own commit + TestKit test |
|
||||
| 2 | Configuration (`DraftSnapshot` + `DraftValidator`) | Low–medium (new snapshot field + rule) | own commit + validator tests |
|
||||
| 3 | docker-dev compose/README | Low (config only) | own commit; coordinate with active docker-dev work |
|
||||
|
||||
All three are independent and can be reviewed/merged separately. No cross-dependencies.
|
||||
@@ -0,0 +1,286 @@
|
||||
# OtOpcUa Follow-ons Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans (or subagent-driven-development) to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Ship three independent hardening items from `docs/plans/2026-06-07-otopcua-followons-design.md`: (1) fix the `DriverInstanceActor` subscribe ActorContext race, (2) reject Tag↔VirtualTag NodeId collisions at deploy via a surgical validator gate, (3) make the docker-dev stack survive a constrained host.
|
||||
|
||||
**Architecture:** All three live in OtOpcUa, on branch `feat/otopcua-followons` (off `master` `4c221ce`). They are independent (no cross-deps except T3→T2). #1 is a Runtime actor fix; #2 wires the (currently dormant) `DraftValidator` into the deploy handler but gates *only* on the new collision rule (other dormant rules run but don't block — avoids breaking the existing non-canonical company overlay); #3 is docker-dev config only.
|
||||
|
||||
**Tech Stack:** .NET 10, Akka.NET (TestKit), EF Core (`OtOpcUaConfigDbContext`), xUnit + Shouldly, Docker Compose.
|
||||
|
||||
**Decisions locked (from brainstorming):** #2 = **surgical gate** (reject only on `EquipmentSignalNameCollision`; build the `DraftSnapshot` + wire `DraftValidator.Validate` so it's future-ready, but filter the gate to the one code). #3 = quiet EF/AspNetCore logging + per-service `mem_limit`/`mem_reservation` + README (no lite profile).
|
||||
|
||||
**Environment caveat:** docker-dev is being actively reconfigured by the user (DB reset, single-mesh, fresh-volume bootstrap). Do **not** rely on live-DB state for tests — #2's tests are unit/TestKit with synthetic snapshots; #3 is verified by bringing the stack up. Coordinate #3's edits with the in-flight docker-dev commits on master (keep them additive).
|
||||
|
||||
---
|
||||
|
||||
## Task graph
|
||||
|
||||
```
|
||||
T1 (#1 subscribe race) ┐ independent, disjoint files
|
||||
T2 (#2 snapshot field + rule) ┤ → T3 (#2 deploy gate)
|
||||
T4 (#3 docker-dev config) ┘
|
||||
```
|
||||
T1, T2, T4 are mutually parallelizable. T3 depends on T2.
|
||||
|
||||
---
|
||||
|
||||
### Task 1: Fix the `DriverInstanceActor` subscribe ActorContext race
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** Task 2, Task 4
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs` (`HandleSubscribeAsync` ~349-384, `UnsubscribeAsync` ~386-403, `HandleWriteAsync` ~315-345 — drop `ConfigureAwait(false)`)
|
||||
- Test: `tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs`
|
||||
|
||||
**Root cause (recap):** `HandleSubscribeAsync` reads `Sender` (= `Context.Sender`) on line 362 *after* a conditional `await UnsubscribeAsync().ConfigureAwait(false)` on line 359. `ConfigureAwait(false)` resumes the continuation off Akka's `ActorTaskScheduler`, so the re-subscribe path reads `Context.Sender` on a thread-pool thread → `no active ActorContext`.
|
||||
|
||||
**Step 1 — Write the failing test.** In `DriverInstanceActorTests.cs` (read it first for the existing fake-`ISubscribable` driver + TestKit harness pattern; reuse the fake driver that drives the actor to the `Connected` state). Add a test that subscribes, then subscribes AGAIN (so `_subscriptionHandle is not null` → drives the line-359 unsubscribe-then-resubscribe path):
|
||||
```csharp
|
||||
[Fact]
|
||||
public void Resubscribe_does_not_throw_no_active_ActorContext()
|
||||
{
|
||||
var (actor, _) = CreateConnectedSubscribableDriver(); // mirror the existing helper
|
||||
actor.Tell(new DriverInstanceActor.Subscribe(new[] { "ref.A" }, TimeSpan.FromSeconds(1)));
|
||||
ExpectMsg<DriverInstanceActor.SubscriptionEstablished>();
|
||||
// Second subscribe exercises the await-Unsubscribe-then-read-Sender path that threw.
|
||||
actor.Tell(new DriverInstanceActor.Subscribe(new[] { "ref.B" }, TimeSpan.FromSeconds(1)));
|
||||
ExpectMsg<DriverInstanceActor.SubscriptionEstablished>(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
```
|
||||
(Match the real message type names/namespaces from the actor. If the existing tests already have a "subscribe" test, clone its setup.)
|
||||
|
||||
**Step 2 — Run, verify it fails.** `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ --filter "FullyQualifiedName~DriverInstanceActor"` — expect the new test to fail (the second subscribe throws `no active ActorContext`, so no `SubscriptionEstablished` arrives → `ExpectMsg` times out).
|
||||
|
||||
**Step 3 — Implement.** In `HandleSubscribeAsync`, move the `Sender`/`Self` capture to the **top**, before the `ISubscribable` check and the conditional unsubscribe await; use the captured locals everywhere after; and drop `.ConfigureAwait(false)` from the awaits inside this and the sibling `ReceiveAsync` handlers:
|
||||
```csharp
|
||||
private async Task HandleSubscribeAsync(Subscribe msg)
|
||||
{
|
||||
var replyTo = Sender; // capture BEFORE any await (Context.Sender is invalid post-await)
|
||||
var self = Self;
|
||||
if (_driver is not ISubscribable subscribable)
|
||||
{
|
||||
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
||||
return;
|
||||
}
|
||||
if (_subscriptionHandle is not null)
|
||||
{
|
||||
await UnsubscribeAsync(); // no ConfigureAwait(false) — resume on the actor context
|
||||
}
|
||||
try
|
||||
{
|
||||
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
||||
subscribable.OnDataChange += _dataChangeHandler;
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
_subscriptionHandle = await subscribable
|
||||
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token); // no ConfigureAwait(false)
|
||||
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
||||
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
||||
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
DetachSubscription();
|
||||
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
|
||||
replyTo.Tell(new SubscriptionFailed(ex.Message));
|
||||
}
|
||||
}
|
||||
```
|
||||
Also remove `.ConfigureAwait(false)` from the awaits in `UnsubscribeAsync` (line ~396) and `HandleWriteAsync` (line ~330) so every `ReceiveAsync` continuation resumes on the actor context. (`HandleApplyDeltaAsync` already captures `Sender` first and has no `ConfigureAwait(false)` — leave it.) Audit each handler: no raw `Sender`/`Self`/`Context` access past any await.
|
||||
|
||||
**Step 4 — Run, verify it passes.** Same filter → green. Then run the broader Runtime driver slice: `dotnet test tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ --filter "FullyQualifiedName~DriverInstance|FullyQualifiedName~DriverHost"` — no regression.
|
||||
|
||||
**Step 5 — Commit.** `git add … && git commit -m "fix(runtime): capture Sender before await in DriverInstanceActor subscribe (no-ActorContext race)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 2: Add `VirtualTags` to `DraftSnapshot` + the collision rule
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** Task 1, Task 4
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftSnapshot.cs` (add `VirtualTags`)
|
||||
- Modify: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftValidator.cs` (add rule + wire into `Validate`)
|
||||
- Test: `tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/DraftValidatorTests.cs`
|
||||
|
||||
**Step 1 — Write failing tests.** In `DraftValidatorTests.cs` (read it for the snapshot-builder helper the existing tests use — they construct `DraftSnapshot` via object initializer with the entities). Add:
|
||||
```csharp
|
||||
[Fact]
|
||||
public void Tag_and_VirtualTag_same_equipment_and_name_collide()
|
||||
{
|
||||
var draft = MakeDraft(/* one Equipment eq-1 */) with-ish initializer:
|
||||
Tags = [ new Tag { TagId="t1", EquipmentId="eq-1", Name="speed", DataType="Float64", FolderPath=null, /*required fields*/ } ],
|
||||
VirtualTags = [ new VirtualTag { VirtualTagId="v1", EquipmentId="eq-1", Name="speed", DataType="Float64", ScriptId="s1" } ];
|
||||
DraftValidator.Validate(draft).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Same_name_different_folder_does_not_collide()
|
||||
{
|
||||
// Tag under FolderPath "metrics" vs VirtualTag at equipment root → different NodeId → OK
|
||||
... Tags=[Tag{EquipmentId="eq-1",Name="speed",FolderPath="metrics"}], VirtualTags=[VirtualTag{EquipmentId="eq-1",Name="speed"}]
|
||||
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SystemPlatform_tag_sharing_name_with_equipment_vtag_is_excluded()
|
||||
{
|
||||
// Tag with EquipmentId == null (galaxy/SystemPlatform) never collides with an equipment vtag
|
||||
... Tags=[Tag{EquipmentId=null,Name="speed"}], VirtualTags=[VirtualTag{EquipmentId="eq-1",Name="speed"}]
|
||||
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
```
|
||||
(Fill in every `required`/NOT-NULL field on `Tag`/`VirtualTag`/`Equipment` — read the entity classes `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Entities/{Tag,VirtualTag,Equipment}.cs`. Mirror how existing tests build entities.)
|
||||
|
||||
**Step 2 — Run, verify fail** (compile error: `VirtualTags` not on `DraftSnapshot`; then rule missing).
|
||||
`dotnet test tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ --filter "FullyQualifiedName~DraftValidator"`
|
||||
|
||||
**Step 3 — Implement.**
|
||||
In `DraftSnapshot.cs`, after the `Tags` member:
|
||||
```csharp
|
||||
/// <summary>Equipment-bound VirtualTags (script-derived signals). Used by the NodeId-collision rule.</summary>
|
||||
public IReadOnlyList<VirtualTag> VirtualTags { get; init; } = [];
|
||||
```
|
||||
In `DraftValidator.cs`, add the rule and call it from `Validate`:
|
||||
```csharp
|
||||
private static void ValidateNoEquipmentSignalNameCollision(DraftSnapshot draft, List<ValidationError> errors)
|
||||
{
|
||||
// The materialiser keys equipment signal variables on the folder-scoped NodeId
|
||||
// "{EquipmentId}[/{FolderPath}]/{Name}". Tag (EquipmentId != null) and VirtualTag share that
|
||||
// node space with no DB-level cross-table uniqueness, so the same key from both collides.
|
||||
static string Key(string eq, string? folder, string name) =>
|
||||
string.IsNullOrWhiteSpace(folder) ? $"{eq}/{name}" : $"{eq}/{folder}/{name}";
|
||||
|
||||
var signals = draft.Tags
|
||||
.Where(t => t.EquipmentId is not null)
|
||||
.Select(t => (Key: Key(t.EquipmentId!, t.FolderPath, t.Name), Eq: t.EquipmentId!, t.Name))
|
||||
.Concat(draft.VirtualTags
|
||||
.Select(v => (Key: Key(v.EquipmentId, null, v.Name), Eq: v.EquipmentId, v.Name)));
|
||||
|
||||
foreach (var g in signals.GroupBy(s => s.Key, StringComparer.Ordinal).Where(g => g.Count() > 1))
|
||||
{
|
||||
var f = g.First();
|
||||
errors.Add(new("EquipmentSignalNameCollision",
|
||||
$"{g.Count()} signals collide on OPC UA NodeId '{g.Key}' (equipment '{f.Eq}', name '{f.Name}'); " +
|
||||
"a Name must be unique across Tag and VirtualTag within an equipment+folder",
|
||||
f.Eq));
|
||||
}
|
||||
}
|
||||
```
|
||||
Add `ValidateNoEquipmentSignalNameCollision(draft, errors);` to the `Validate` method's rule list (around line 33). Confirm the `VirtualTag` entity exposes `EquipmentId`, `Name` (it does); it has no `FolderPath` (so vtags use `null` folder).
|
||||
|
||||
**Step 4 — Run, verify pass.** Same filter → green; then the whole Configuration.Tests suite to confirm no regression.
|
||||
|
||||
**Step 5 — Commit.** `git add … && git commit -m "feat(config): DraftValidator rule + DraftSnapshot.VirtualTags for Tag/VirtualTag NodeId collisions"`
|
||||
|
||||
---
|
||||
|
||||
### Task 3: Build `DraftSnapshot` from the DB + wire the surgical deploy gate
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none (depends on Task 2)
|
||||
|
||||
**Files:**
|
||||
- Create: `src/Core/ZB.MOM.WW.OtOpcUa.Configuration/Validation/DraftSnapshotFactory.cs`
|
||||
- Modify: `src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/AdminOperations/AdminOperationsActor.cs` (`HandleStartDeploymentAsync`, before the `ConfigComposer.SnapshotAndFlattenAsync` seal)
|
||||
- Read first: `AdminOperationsActor.cs` (the `db` in scope + the `StartDeploymentOutcome` enum incl. `Rejected` + the `StartDeploymentResult` shape), and `src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Api/DeployApiEndpoints.cs` (confirm `Rejected` maps to a non-2xx HTTP — if not, map it, e.g. 422).
|
||||
- Test: `tests/Core/ZB.MOM.WW.OtOpcUa.Configuration.Tests/DraftSnapshotFactoryTests.cs` (new) + extend an `AdminOperationsActor` test if one exists (TestKit) to assert a colliding config is Rejected.
|
||||
|
||||
**Design:** Build a `DraftSnapshot` from the live config DB and run the FULL `DraftValidator.Validate`, but **gate the deploy only on `EquipmentSignalNameCollision`** — other (dormant, possibly-failing) rules run but do not block. This wires the validator for future activation without breaking the non-canonical company overlay.
|
||||
|
||||
**Step 1 — Write failing tests.**
|
||||
`DraftSnapshotFactoryTests.cs`: using an in-memory `OtOpcUaConfigDbContext` (mirror how other Configuration.Tests build one) seeded with one Equipment + a Tag and a VirtualTag sharing (EquipmentId, Name), assert `DraftSnapshotFactory.FromConfigDbAsync(db)` returns a snapshot whose `Tags` and `VirtualTags` are populated, and `DraftValidator.Validate(snapshot)` contains `EquipmentSignalNameCollision`.
|
||||
If `AdminOperationsActor` has a TestKit test harness, add: seed a colliding config → `StartDeployment` → `StartDeploymentResult` outcome is `Rejected` with the collision message; seed a non-colliding config → `Accepted` (sealed).
|
||||
|
||||
**Step 2 — Run, verify fail.**
|
||||
|
||||
**Step 3 — Implement.**
|
||||
`DraftSnapshotFactory.cs` — load the tables the rules touch (so they don't NRE), gate-relevant ones fully populated:
|
||||
```csharp
|
||||
public static class DraftSnapshotFactory
|
||||
{
|
||||
public static async Task<DraftSnapshot> FromConfigDbAsync(OtOpcUaConfigDbContext db, CancellationToken ct = default)
|
||||
=> new DraftSnapshot
|
||||
{
|
||||
GenerationId = 0, // generation model dropped; placeholder (no rule reads it)
|
||||
ClusterId = string.Empty, // global snapshot; rules compare entity ClusterId fields, not this
|
||||
Namespaces = await db.Namespaces.AsNoTracking().ToListAsync(ct),
|
||||
DriverInstances = await db.DriverInstances.AsNoTracking().ToListAsync(ct),
|
||||
Devices = await db.Devices.AsNoTracking().ToListAsync(ct),
|
||||
UnsAreas = await db.UnsAreas.AsNoTracking().ToListAsync(ct),
|
||||
UnsLines = await db.UnsLines.AsNoTracking().ToListAsync(ct),
|
||||
Equipment = await db.Equipment.AsNoTracking().ToListAsync(ct),
|
||||
Tags = await db.Tags.AsNoTracking().ToListAsync(ct),
|
||||
VirtualTags = await db.VirtualTags.AsNoTracking().ToListAsync(ct),
|
||||
PollGroups = await db.PollGroups.AsNoTracking().ToListAsync(ct),
|
||||
PriorEquipment = [], // no prior generation in the live model (UUID-immutability rule no-ops)
|
||||
ActiveReservations= await db.ExternalIdReservations.AsNoTracking().ToListAsync(ct),
|
||||
// Enterprise/Site left null — only PathLength reads them (under-counts; acceptable, not gated here)
|
||||
};
|
||||
}
|
||||
```
|
||||
(Confirm the DbSet names: `db.ExternalIdReservations` etc. — read `OtOpcUaConfigDbContext`. Adjust if different.)
|
||||
|
||||
In `AdminOperationsActor.HandleStartDeploymentAsync`, immediately before `ConfigComposer.SnapshotAndFlattenAsync(db)`:
|
||||
```csharp
|
||||
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
|
||||
var collisions = DraftValidator.Validate(draft)
|
||||
.Where(e => e.Code == "EquipmentSignalNameCollision")
|
||||
.ToList();
|
||||
if (collisions.Count > 0)
|
||||
{
|
||||
var summary = string.Join("; ", collisions.Select(e => e.Message));
|
||||
_log.Warning("StartDeployment rejected: {Summary}", summary);
|
||||
replyTo.Tell(new StartDeploymentResult(StartDeploymentOutcome.Rejected, /* id */ null, summary /* match the result shape */));
|
||||
return;
|
||||
}
|
||||
```
|
||||
(Match the real `StartDeploymentResult` constructor + how `replyTo`/the failure path is shaped at lines ~108-119.) If `DeployApiEndpoints` doesn't already map `Rejected` to a non-2xx, map it to **422 Unprocessable Entity** with the message.
|
||||
|
||||
**Step 4 — Run, verify pass.** Configuration.Tests + the AdminOperations slice green. Build the Host project to confirm the endpoint mapping compiles.
|
||||
|
||||
**Step 5 — Commit.** `git add … && git commit -m "feat(deploy): reject Tag/VirtualTag NodeId collisions at deploy (surgical DraftValidator gate)"`
|
||||
|
||||
---
|
||||
|
||||
### Task 4: docker-dev resource hardening (logging + memory + README)
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** Task 1, Task 2
|
||||
|
||||
**Files:**
|
||||
- Modify: `docker-dev/docker-compose.yml` (the host-service env + per-service limits; use the `&otopcua-host` anchor / per-service blocks)
|
||||
- Modify: `docker-dev/README.md` (memory note)
|
||||
- Read first: the current compose to see the shared anchor + each host service's `environment` block, and **rebase/coordinate** with the in-flight docker-dev commits on master (keep edits additive).
|
||||
|
||||
**Step 1 — Quiet logging.** Add to the host services' `environment` (prefer the `&otopcua-host` anchor so all inherit; if envs are per-service, add to each of central-1/2, site-a-1/2, site-b-1/2):
|
||||
```yaml
|
||||
Logging__LogLevel__Microsoft.EntityFrameworkCore: "Warning"
|
||||
Logging__LogLevel__Microsoft.AspNetCore: "Warning"
|
||||
```
|
||||
(Keep `Default`/app loggers at their current level. This removes the per-poll `SELECT FROM Deployment` + `Executed DbCommand` flood that drove the heartbeat thread-starvation.)
|
||||
|
||||
**Step 2 — Memory bounds.** Add to each host service (sized with headroom; start from a measured steady-state — see Step 3):
|
||||
```yaml
|
||||
mem_limit: 1500m
|
||||
mem_reservation: 768m
|
||||
```
|
||||
(Use Compose v2 top-level `mem_limit`/`mem_reservation` keys, matching whatever resource style the file already uses; if it uses `deploy.resources.limits`, match that instead.)
|
||||
|
||||
**Step 3 — Size + verify (operational).** Bring up the full stack: `cd docker-dev && docker compose up -d`. Wait for the cluster to form, then `docker stats --no-stream` to read steady-state RSS per node; set `mem_limit` ≈ peak + ~30% headroom (adjust the Step-2 numbers). Confirm: no `OOMKilled` (`docker inspect … --format '{{.State.OOMKilled}}'` = false on all), `:9200` healthy, `:4840` listening, and the EF SQL flood is gone from `docker compose logs central-1`.
|
||||
|
||||
**Step 4 — README.** Add a short note to `docker-dev/README.md`: the full single-mesh stack needs ≈ `<measured total>` of Docker Desktop VM memory; on a constrained host, raise the VM memory or run fewer host services. Mention the EF log level is pinned to Warning in dev.
|
||||
|
||||
**Step 5 — Commit.** `git add docker-dev/docker-compose.yml docker-dev/README.md && git commit -m "fix(docker-dev): pin EF/AspNetCore logs to Warning + per-service mem limits to stop OOM/starvation"`
|
||||
|
||||
---
|
||||
|
||||
## After all tasks
|
||||
|
||||
Run the affected test projects (`Runtime.Tests`, `Configuration.Tests`) + build the Host project, then use **superpowers-extended-cc:finishing-a-development-branch**: verify green, then present merge/PR/keep/discard for `feat/otopcua-followons`. Merge/push only on the user's explicit go (the user manages their own integration in this repo — coordinate, given concurrent docker-dev work on master).
|
||||
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-06-07-otopcua-followons.md",
|
||||
"tasks": [
|
||||
{"id": 1, "subject": "Task 1: Fix DriverInstanceActor subscribe ActorContext race", "status": "completed", "classification": "high-risk", "parallelizableWith": [2, 4]},
|
||||
{"id": 2, "subject": "Task 2: DraftSnapshot.VirtualTags + EquipmentSignalNameCollision rule", "status": "completed", "classification": "standard", "parallelizableWith": [1, 4]},
|
||||
{"id": 3, "subject": "Task 3: DraftSnapshotFactory + surgical deploy gate", "status": "completed", "classification": "high-risk", "blockedBy": [2]},
|
||||
{"id": 4, "subject": "Task 4: docker-dev logging + mem limits + README", "status": "completed", "classification": "standard", "parallelizableWith": [1, 2]}
|
||||
],
|
||||
"lastUpdated": "2026-06-07"
|
||||
}
|
||||
@@ -39,6 +39,10 @@ public sealed class DraftSnapshot
|
||||
public IReadOnlyList<Equipment> Equipment { get; init; } = [];
|
||||
/// <summary>Gets the list of tags.</summary>
|
||||
public IReadOnlyList<Tag> Tags { get; init; } = [];
|
||||
|
||||
/// <summary>Equipment-bound VirtualTags (script-derived signals). Shares the equipment NodeId space
|
||||
/// with Tags; the collision rule checks both.</summary>
|
||||
public IReadOnlyList<VirtualTag> VirtualTags { get; init; } = [];
|
||||
/// <summary>Gets the list of poll groups.</summary>
|
||||
public IReadOnlyList<PollGroup> PollGroups { get; init; } = [];
|
||||
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Validation;
|
||||
|
||||
/// <summary>
|
||||
/// Materialises a <see cref="DraftSnapshot"/> from the live config DB so
|
||||
/// <see cref="DraftValidator"/> can run against the current edit state at deploy time.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This is a whole-DB ("global") snapshot — every cluster's rows in one pass — which is
|
||||
/// what the deploy path needs: the admin-operations actor snapshots and flattens the
|
||||
/// entire config, not one cluster. The validator's rules compare entity-level
|
||||
/// <c>ClusterId</c> fields against each other (e.g. namespace binding), so the snapshot's
|
||||
/// own <see cref="DraftSnapshot.ClusterId"/> is not read by any rule and is left empty.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="DraftSnapshot.GenerationId"/> is a placeholder (the generation model was
|
||||
/// dropped); no rule reads it. <see cref="DraftSnapshot.PriorEquipment"/> is empty because
|
||||
/// there is no prior-generation table to diff against. <see cref="DraftSnapshot.Enterprise"/>
|
||||
/// / <see cref="DraftSnapshot.Site"/> are left null so the path-length rule uses its
|
||||
/// conservative upper bound.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class DraftSnapshotFactory
|
||||
{
|
||||
/// <summary>Builds a <see cref="DraftSnapshot"/> from the current config DB rows.</summary>
|
||||
/// <param name="db">The config DB context to read from.</param>
|
||||
/// <param name="ct">Cancellation token.</param>
|
||||
/// <returns>A snapshot populated from the live DB, ready for <see cref="DraftValidator.Validate"/>.</returns>
|
||||
public static async Task<DraftSnapshot> FromConfigDbAsync(OtOpcUaConfigDbContext db, CancellationToken ct = default)
|
||||
=> new DraftSnapshot
|
||||
{
|
||||
GenerationId = 0, // generation model dropped; placeholder (no rule reads it)
|
||||
ClusterId = string.Empty, // global snapshot; rules compare entity ClusterId fields, not this
|
||||
Namespaces = await db.Namespaces.AsNoTracking().ToListAsync(ct),
|
||||
DriverInstances = await db.DriverInstances.AsNoTracking().ToListAsync(ct),
|
||||
Devices = await db.Devices.AsNoTracking().ToListAsync(ct),
|
||||
UnsAreas = await db.UnsAreas.AsNoTracking().ToListAsync(ct),
|
||||
UnsLines = await db.UnsLines.AsNoTracking().ToListAsync(ct),
|
||||
Equipment = await db.Equipment.AsNoTracking().ToListAsync(ct),
|
||||
Tags = await db.Tags.AsNoTracking().ToListAsync(ct),
|
||||
VirtualTags = await db.VirtualTags.AsNoTracking().ToListAsync(ct),
|
||||
PollGroups = await db.PollGroups.AsNoTracking().ToListAsync(ct),
|
||||
PriorEquipment = [],
|
||||
ActiveReservations = await db.ExternalIdReservations
|
||||
.AsNoTracking()
|
||||
.Where(r => r.ReleasedAt == null) // active only — matches DraftSnapshot.ActiveReservations semantics
|
||||
.ToListAsync(ct),
|
||||
};
|
||||
}
|
||||
@@ -32,10 +32,37 @@ public static class DraftValidator
|
||||
ValidateReservationPreflight(draft, errors);
|
||||
ValidateEquipmentIdDerivation(draft, errors);
|
||||
ValidateDriverNamespaceCompatibility(draft, errors);
|
||||
ValidateNoEquipmentSignalNameCollision(draft, errors);
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
private static void ValidateNoEquipmentSignalNameCollision(DraftSnapshot draft, List<ValidationError> errors)
|
||||
{
|
||||
// Materialiser NodeId key: "{EquipmentId}[/{FolderPath}]/{Name}". Tag (EquipmentId != null) and
|
||||
// VirtualTag share this space with no DB cross-table uniqueness, so the same key from both collides.
|
||||
static string Key(string eq, string? folder, string name) =>
|
||||
string.IsNullOrWhiteSpace(folder) ? $"{eq}/{name}" : $"{eq}/{folder}/{name}";
|
||||
|
||||
var signals = draft.Tags
|
||||
.Where(t => t.EquipmentId is not null)
|
||||
.Select(t => (Key: Key(t.EquipmentId!, t.FolderPath, t.Name), Eq: t.EquipmentId!, t.Name))
|
||||
// VirtualTag has no FolderPath column today — null is correct here; update if it ever gains one.
|
||||
.Concat(draft.VirtualTags
|
||||
.Select(v => (Key: Key(v.EquipmentId, null, v.Name), Eq: v.EquipmentId, v.Name)));
|
||||
|
||||
foreach (var g in signals.GroupBy(s => s.Key, StringComparer.Ordinal))
|
||||
{
|
||||
var items = g.ToList();
|
||||
if (items.Count <= 1) continue;
|
||||
var f = items[0];
|
||||
errors.Add(new("EquipmentSignalNameCollision",
|
||||
$"{items.Count} signals collide on OPC UA NodeId '{g.Key}' (equipment '{f.Eq}', name '{f.Name}'); " +
|
||||
"a Name must be unique across Tag and VirtualTag within an equipment+folder",
|
||||
f.Eq));
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsValidSegment(string? s) =>
|
||||
s is not null && (UnsSegment.IsMatch(s) || s == UnsDefaultSegment);
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.AdminOperations;
|
||||
@@ -78,6 +79,27 @@ public sealed class AdminOperationsActor : ReceiveActor
|
||||
return;
|
||||
}
|
||||
|
||||
// Surgical pre-seal gate: reject only on a Tag↔VirtualTag NodeId collision. The other
|
||||
// DraftValidator rules still run (one pass) but must NOT block here — they are dormant
|
||||
// and the current non-canonical company overlay would otherwise fail them. Filter to the
|
||||
// single collision code so a real OPC UA address-space clash can never be deployed.
|
||||
var draft = await DraftSnapshotFactory.FromConfigDbAsync(db);
|
||||
var collisions = DraftValidator.Validate(draft)
|
||||
.Where(e => e.Code == "EquipmentSignalNameCollision")
|
||||
.ToList();
|
||||
if (collisions.Count > 0)
|
||||
{
|
||||
var summary = string.Join("; ", collisions.Select(e => e.Message));
|
||||
_log.Warning("StartDeployment rejected (signal collision): {Summary}", summary);
|
||||
replyTo.Tell(new StartDeploymentResult(
|
||||
StartDeploymentOutcome.Rejected,
|
||||
DeploymentId: null,
|
||||
RevisionHash: null,
|
||||
Message: summary,
|
||||
msg.CorrelationId));
|
||||
return;
|
||||
}
|
||||
|
||||
var artifact = await ConfigComposer.SnapshotAndFlattenAsync(db);
|
||||
var deploymentId = DeploymentId.NewId();
|
||||
var revHash = RevisionHash.Parse(artifact.RevisionHash);
|
||||
|
||||
@@ -88,9 +88,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
/// </summary>
|
||||
private readonly Queue<DateTime> _faultTimestamps = new();
|
||||
|
||||
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
|
||||
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
|
||||
/// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113).</summary>
|
||||
/// <summary>Active subscription handle (null when not subscribed). Tracks the current live
|
||||
/// subscription; the actor auto-(re)subscribes on (re)connect and on each <see cref="Subscribe"/>
|
||||
/// message via <see cref="ResubscribeDesired"/> / <see cref="HandleSubscribeAsync"/>, so callers
|
||||
/// do not need to re-send subscription requests after a reconnect.</summary>
|
||||
private ISubscriptionHandle? _subscriptionHandle;
|
||||
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
|
||||
|
||||
@@ -314,20 +315,20 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
|
||||
private async Task HandleWriteAsync(WriteAttribute msg)
|
||||
{
|
||||
var replyTo = Sender;
|
||||
if (_driver is not IWritable writable)
|
||||
{
|
||||
Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
|
||||
replyTo.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
|
||||
return;
|
||||
}
|
||||
|
||||
var replyTo = Sender;
|
||||
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
|
||||
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
|
||||
// retry off by default, but a stalled call still needs an answer.
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
try
|
||||
{
|
||||
var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false);
|
||||
var results = await writable.WriteAsync(request, cts.Token);
|
||||
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
|
||||
{
|
||||
replyTo.Tell(new WriteAttributeResult(true, null));
|
||||
@@ -348,19 +349,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
|
||||
private async Task HandleSubscribeAsync(Subscribe msg)
|
||||
{
|
||||
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits
|
||||
// UnsubscribeAsync, and a real async backend can resume the continuation off Akka's
|
||||
// ActorContext — reading raw Sender/Self/Context past that point throws
|
||||
// NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits
|
||||
// in this handler so continuations resume on the actor context.
|
||||
var replyTo = Sender;
|
||||
var self = Self;
|
||||
if (_driver is not ISubscribable subscribable)
|
||||
{
|
||||
Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
||||
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
||||
return;
|
||||
}
|
||||
if (_subscriptionHandle is not null)
|
||||
{
|
||||
// Subscribe-twice — drop the prior subscription before establishing the new one.
|
||||
await UnsubscribeAsync().ConfigureAwait(false);
|
||||
await UnsubscribeAsync();
|
||||
}
|
||||
|
||||
var replyTo = Sender;
|
||||
var self = Self;
|
||||
try
|
||||
{
|
||||
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
||||
@@ -368,8 +374,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
_subscriptionHandle = await subscribable
|
||||
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token)
|
||||
.ConfigureAwait(false);
|
||||
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token);
|
||||
|
||||
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
||||
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
||||
@@ -393,7 +398,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
try
|
||||
{
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false);
|
||||
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Validation;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies <see cref="DraftSnapshotFactory.FromConfigDbAsync"/> materialises a
|
||||
/// <see cref="DraftSnapshot"/> from the live config DB whose Tag/VirtualTag rows feed the
|
||||
/// equipment-signal collision rule — the one rule wired into the deploy gate (Task 3).
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DraftSnapshotFactoryTests : IDisposable
|
||||
{
|
||||
private readonly OtOpcUaConfigDbContext _db;
|
||||
|
||||
/// <summary>Initializes a new instance with an isolated in-memory config DB.</summary>
|
||||
public DraftSnapshotFactoryTests()
|
||||
{
|
||||
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||
.UseInMemoryDatabase($"draft-snapshot-{Guid.NewGuid():N}")
|
||||
.Options;
|
||||
_db = new OtOpcUaConfigDbContext(options);
|
||||
}
|
||||
|
||||
/// <summary>Disposes the database context.</summary>
|
||||
public void Dispose() => _db.Dispose();
|
||||
|
||||
/// <summary>Seeds one Equipment plus a Tag and a VirtualTag sharing (EquipmentId, Name); the
|
||||
/// snapshot must carry both signal collections AND the validator must flag the collision.</summary>
|
||||
[Fact]
|
||||
public async Task FromConfigDb_populates_Tags_and_VirtualTags_and_surfaces_collision()
|
||||
{
|
||||
SeedEquipment("eq-1");
|
||||
_db.Tags.Add(BuildTag(equipmentId: "eq-1", name: "speed"));
|
||||
_db.VirtualTags.Add(BuildVirtualTag(equipmentId: "eq-1", name: "speed"));
|
||||
await _db.SaveChangesAsync();
|
||||
|
||||
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
|
||||
|
||||
snapshot.Tags.Count.ShouldBe(1);
|
||||
snapshot.VirtualTags.Count.ShouldBe(1);
|
||||
DraftValidator.Validate(snapshot).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
/// <summary>A Tag and a VirtualTag with distinct names under the same equipment do not collide,
|
||||
/// so the snapshot validates clean of the collision code.</summary>
|
||||
[Fact]
|
||||
public async Task FromConfigDb_no_collision_when_names_differ()
|
||||
{
|
||||
SeedEquipment("eq-1");
|
||||
_db.Tags.Add(BuildTag(equipmentId: "eq-1", name: "speed"));
|
||||
_db.VirtualTags.Add(BuildVirtualTag(equipmentId: "eq-1", name: "temperature"));
|
||||
await _db.SaveChangesAsync();
|
||||
|
||||
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
|
||||
|
||||
snapshot.Tags.Count.ShouldBe(1);
|
||||
snapshot.VirtualTags.Count.ShouldBe(1);
|
||||
DraftValidator.Validate(snapshot).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
/// <summary>Seeds one active and one released reservation for the same equipment context;
|
||||
/// only the active row (ReleasedAt == null) should appear in the snapshot.</summary>
|
||||
[Fact]
|
||||
public async Task FromConfigDb_ActiveReservations_excludes_released_rows()
|
||||
{
|
||||
var equipmentUuid = Guid.NewGuid();
|
||||
|
||||
_db.ExternalIdReservations.Add(new ExternalIdReservation
|
||||
{
|
||||
ReservationId = Guid.NewGuid(),
|
||||
Kind = ReservationKind.ZTag,
|
||||
Value = "ZT-001",
|
||||
EquipmentUuid = equipmentUuid,
|
||||
ClusterId = "cluster-a",
|
||||
FirstPublishedBy = "test",
|
||||
ReleasedAt = null, // active
|
||||
});
|
||||
_db.ExternalIdReservations.Add(new ExternalIdReservation
|
||||
{
|
||||
ReservationId = Guid.NewGuid(),
|
||||
Kind = ReservationKind.ZTag,
|
||||
Value = "ZT-002",
|
||||
EquipmentUuid = equipmentUuid,
|
||||
ClusterId = "cluster-a",
|
||||
FirstPublishedBy = "test",
|
||||
ReleasedAt = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc), // released
|
||||
ReleasedBy = "test",
|
||||
ReleaseReason = "retired",
|
||||
});
|
||||
await _db.SaveChangesAsync();
|
||||
|
||||
var snapshot = await DraftSnapshotFactory.FromConfigDbAsync(_db);
|
||||
|
||||
snapshot.ActiveReservations.Count.ShouldBe(1);
|
||||
snapshot.ActiveReservations[0].Value.ShouldBe("ZT-001");
|
||||
snapshot.ActiveReservations[0].ReleasedAt.ShouldBeNull();
|
||||
}
|
||||
|
||||
private void SeedEquipment(string equipmentId)
|
||||
{
|
||||
var uuid = Guid.NewGuid();
|
||||
_db.Equipment.Add(new Equipment
|
||||
{
|
||||
EquipmentUuid = uuid,
|
||||
EquipmentId = equipmentId,
|
||||
Name = "eq",
|
||||
DriverInstanceId = "d",
|
||||
UnsLineId = "line-a",
|
||||
MachineCode = "m",
|
||||
});
|
||||
}
|
||||
|
||||
private static Tag BuildTag(string equipmentId, string name) => new()
|
||||
{
|
||||
TagId = $"tag-{name}",
|
||||
DriverInstanceId = "d",
|
||||
EquipmentId = equipmentId,
|
||||
Name = name,
|
||||
DataType = "Float",
|
||||
AccessLevel = TagAccessLevel.Read,
|
||||
TagConfig = "{}",
|
||||
};
|
||||
|
||||
private static VirtualTag BuildVirtualTag(string equipmentId, string name) => new()
|
||||
{
|
||||
VirtualTagId = $"vtag-{name}",
|
||||
EquipmentId = equipmentId,
|
||||
Name = name,
|
||||
DataType = "Float",
|
||||
ScriptId = "s-1",
|
||||
};
|
||||
}
|
||||
@@ -186,6 +186,76 @@ public sealed class DraftValidatorTests
|
||||
errors.ShouldContain(e => e.Code == "UnsSegmentInvalid");
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------
|
||||
// ValidateNoEquipmentSignalNameCollision — Tag/VirtualTag NodeId collision
|
||||
// ------------------------------------------------------------------------------------
|
||||
|
||||
/// <summary>Verifies that an equipment-bound Tag and a VirtualTag sharing the same
|
||||
/// equipment+name collide on a single OPC UA NodeId.</summary>
|
||||
[Fact]
|
||||
public void Tag_and_VirtualTag_same_equipment_and_name_collide()
|
||||
{
|
||||
var draft = new DraftSnapshot
|
||||
{
|
||||
GenerationId = 1, ClusterId = "c",
|
||||
Tags = [BuildTag(equipmentId: "eq-1", name: "speed", folderPath: null)],
|
||||
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
|
||||
};
|
||||
|
||||
DraftValidator.Validate(draft).ShouldContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a Tag in a sub-folder does not collide with a VirtualTag at the
|
||||
/// equipment root even when the names match — the NodeId keys differ.</summary>
|
||||
[Fact]
|
||||
public void Same_name_different_folder_does_not_collide()
|
||||
{
|
||||
var draft = new DraftSnapshot
|
||||
{
|
||||
GenerationId = 1, ClusterId = "c",
|
||||
Tags = [BuildTag(equipmentId: "eq-1", name: "speed", folderPath: "metrics")],
|
||||
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
|
||||
};
|
||||
|
||||
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a SystemPlatform Tag (EquipmentId == null) is excluded from the
|
||||
/// equipment-signal node space and so never collides with an equipment VirtualTag.</summary>
|
||||
[Fact]
|
||||
public void SystemPlatform_tag_sharing_name_with_equipment_vtag_excluded()
|
||||
{
|
||||
var draft = new DraftSnapshot
|
||||
{
|
||||
GenerationId = 1, ClusterId = "c",
|
||||
Tags = [BuildTag(equipmentId: null, name: "speed", folderPath: null)],
|
||||
VirtualTags = [BuildVirtualTag(equipmentId: "eq-1", name: "speed")],
|
||||
};
|
||||
|
||||
DraftValidator.Validate(draft).ShouldNotContain(e => e.Code == "EquipmentSignalNameCollision");
|
||||
}
|
||||
|
||||
private static Tag BuildTag(string? equipmentId, string name, string? folderPath) => new()
|
||||
{
|
||||
TagId = $"tag-{name}",
|
||||
DriverInstanceId = "d",
|
||||
EquipmentId = equipmentId,
|
||||
Name = name,
|
||||
FolderPath = folderPath,
|
||||
DataType = "Float",
|
||||
AccessLevel = TagAccessLevel.Read,
|
||||
TagConfig = "{}",
|
||||
};
|
||||
|
||||
private static VirtualTag BuildVirtualTag(string equipmentId, string name) => new()
|
||||
{
|
||||
VirtualTagId = $"vtag-{name}",
|
||||
EquipmentId = equipmentId,
|
||||
Name = name,
|
||||
DataType = "Float",
|
||||
ScriptId = "s-1",
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------------------------------
|
||||
// Phase 6.3 task #148 part 2 — ValidateClusterTopology
|
||||
// ------------------------------------------------------------------------------------
|
||||
|
||||
@@ -43,6 +43,60 @@ public sealed class AdminOperationsActorTests : ControlPlaneActorTestBase
|
||||
db.ConfigEdits.Single().EntityType.ShouldBe("Deployment");
|
||||
}
|
||||
|
||||
/// <summary>Verifies the surgical DraftValidator gate: a Tag↔VirtualTag NodeId collision in
|
||||
/// the live config rejects the deploy (422-mapped <see cref="StartDeploymentOutcome.Rejected"/>)
|
||||
/// before any coordinator dispatch — and inserts no Deployment row.</summary>
|
||||
[Fact]
|
||||
public void StartDeployment_rejects_on_Tag_VirtualTag_NodeId_collision()
|
||||
{
|
||||
var dbFactory = NewInMemoryDbFactory();
|
||||
using (var db = dbFactory.CreateDbContext())
|
||||
{
|
||||
db.Equipment.Add(new Configuration.Entities.Equipment
|
||||
{
|
||||
EquipmentUuid = Guid.NewGuid(),
|
||||
EquipmentId = "eq-1",
|
||||
Name = "eq",
|
||||
DriverInstanceId = "d",
|
||||
UnsLineId = "line-a",
|
||||
MachineCode = "m",
|
||||
});
|
||||
db.Tags.Add(new Configuration.Entities.Tag
|
||||
{
|
||||
TagId = "tag-speed",
|
||||
DriverInstanceId = "d",
|
||||
EquipmentId = "eq-1",
|
||||
Name = "speed",
|
||||
DataType = "Float",
|
||||
AccessLevel = TagAccessLevel.Read,
|
||||
TagConfig = "{}",
|
||||
});
|
||||
db.VirtualTags.Add(new Configuration.Entities.VirtualTag
|
||||
{
|
||||
VirtualTagId = "vtag-speed",
|
||||
EquipmentId = "eq-1",
|
||||
Name = "speed",
|
||||
DataType = "Float",
|
||||
ScriptId = "s-1",
|
||||
});
|
||||
db.SaveChanges();
|
||||
}
|
||||
|
||||
var coordinator = CreateTestProbe("coord");
|
||||
var actor = Sys.ActorOf(AdminOperationsActor.Props(dbFactory, coordinator.Ref, Enumerable.Empty<IDriverProbe>()));
|
||||
|
||||
actor.Tell(new StartDeployment("joe", CorrelationId.NewId()));
|
||||
|
||||
coordinator.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
var reply = ExpectMsg<StartDeploymentResult>(TimeSpan.FromSeconds(3));
|
||||
reply.Outcome.ShouldBe(StartDeploymentOutcome.Rejected);
|
||||
reply.Message.ShouldNotBeNull();
|
||||
reply.Message.ShouldContain("collide"); // the rule's message text
|
||||
|
||||
using var verify = dbFactory.CreateDbContext();
|
||||
verify.Deployments.Count().ShouldBe(0);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that starting a deployment is refused when another is in flight.</summary>
|
||||
[Fact]
|
||||
public void StartDeployment_refuses_when_another_is_in_flight()
|
||||
|
||||
@@ -182,6 +182,43 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
||||
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
|
||||
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
|
||||
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
|
||||
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
|
||||
/// bootstrap-restore path where `_subscriptionHandle is not null`.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
|
||||
{
|
||||
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
|
||||
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
|
||||
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
|
||||
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
|
||||
var parent = CreateTestProbe();
|
||||
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
||||
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
||||
|
||||
// First subscribe — establishes the handle.
|
||||
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
|
||||
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
|
||||
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
reply.ReferenceCount.ShouldBe(2);
|
||||
driver.SubscribeCount.ShouldBe(2);
|
||||
// Old handler must have been detached before the new one was attached — no leak.
|
||||
driver.OnDataChangeSubscriberCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
|
||||
[Fact]
|
||||
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
|
||||
@@ -301,6 +338,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
||||
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
|
||||
public IReadOnlyList<string>? LastSubscribedRefs;
|
||||
|
||||
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (`await Task.Yield()`)
|
||||
/// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the
|
||||
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
|
||||
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
|
||||
public bool UnsubscribeYields { get; set; }
|
||||
|
||||
/// <summary>Subscribes to the specified full references.</summary>
|
||||
/// <param name="fullReferences">The full references to subscribe to.</param>
|
||||
/// <param name="publishingInterval">The publishing interval.</param>
|
||||
@@ -316,8 +359,19 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
||||
/// <summary>Unsubscribes from the specified subscription handle.</summary>
|
||||
/// <param name="handle">The subscription handle.</param>
|
||||
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (UnsubscribeYields)
|
||||
{
|
||||
// Complete the awaited task from a fresh background thread that has NO Akka actor
|
||||
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
|
||||
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
|
||||
// a deterministic repro of the real async-backend no-ActorContext race.
|
||||
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
_ = Task.Run(() => tcs.SetResult());
|
||||
await tcs.Task.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Fires a data change event with the specified parameters.</summary>
|
||||
/// <param name="fullRef">The full reference of the data that changed.</param>
|
||||
|
||||
Reference in New Issue
Block a user