Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b12eebbd1 | |||
| bd190ab012 | |||
| 2ead9bc200 | |||
| 1ea08c3b10 | |||
| 4f43733b96 | |||
| 039111ca05 | |||
| 61627fc5b0 | |||
| 7f1018bac1 | |||
| c2c518862f | |||
| e962737d2c | |||
| 7773bdebbd | |||
| c79b292968 | |||
| a43b2ee6af | |||
| f5479f3ca3 | |||
| 00c849e63b | |||
| 3fc6ccad30 |
@@ -37,11 +37,14 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
|
||||
"MaxPendingCommandsPerSession": 128,
|
||||
"DefaultLeaseSeconds": 1800,
|
||||
"LeaseSweepIntervalSeconds": 30,
|
||||
"AllowMultipleEventSubscribers": false
|
||||
"AllowMultipleEventSubscribers": false,
|
||||
"MaxEventSubscribersPerSession": 8
|
||||
},
|
||||
"Events": {
|
||||
"QueueCapacity": 10000,
|
||||
"BackpressurePolicy": "FailFast"
|
||||
"BackpressurePolicy": "FailFast",
|
||||
"ReplayBufferCapacity": 1024,
|
||||
"ReplayRetentionSeconds": 300
|
||||
},
|
||||
"Dashboard": {
|
||||
"Enabled": true,
|
||||
@@ -123,23 +126,35 @@ to avoid accidental large allocations from malformed or oversized frames.
|
||||
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
|
||||
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
|
||||
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
|
||||
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. |
|
||||
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
|
||||
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
|
||||
|
||||
All numeric session options must be greater than zero. The current event stream
|
||||
implementation supports one active subscriber per session; this preserves event
|
||||
ordering and avoids competing consumers.
|
||||
All numeric session options must be greater than zero.
|
||||
|
||||
## Event Options
|
||||
|
||||
| Option | Default | Description |
|
||||
|--------|---------|-------------|
|
||||
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
|
||||
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
|
||||
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Per-subscriber event backpressure behavior when a subscriber's bounded event channel overflows. Overflow is isolated to the offending subscriber: it is always disconnected with an `EventQueueOverflow` fault while the session pump and other subscribers keep running. `FailFast` additionally faults the whole session only in the legacy single-subscriber case (the current default mode); with multiple subscribers it degrades to a per-subscriber disconnect so one slow consumer never faults a shared session. `DisconnectSubscriber` disconnects only the slow subscriber in all cases. |
|
||||
| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. |
|
||||
| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. |
|
||||
|
||||
`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
|
||||
faults the affected worker or session instead of silently dropping MXAccess
|
||||
events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
|
||||
the affected stream while the MXAccess session remains active.
|
||||
`QueueCapacity` must be greater than zero; it bounds each per-subscriber event
|
||||
channel fed by the session's single event pump. A slow subscriber overflows only
|
||||
its own channel and is always disconnected with an `EventQueueOverflow` fault
|
||||
rather than silently dropping MXAccess events — the pump, the session, and other
|
||||
subscribers are unaffected. With `FailFast` in the single-subscriber case (the
|
||||
default mode), that overflow additionally faults the whole session; with multiple
|
||||
subscribers `FailFast` degrades to a per-subscriber disconnect, matching
|
||||
`DisconnectSubscriber`, so one slow consumer cannot fault a session shared by
|
||||
healthy subscribers. With `DisconnectSubscriber`, overflow terminates only the
|
||||
affected stream while the MXAccess session remains active.
|
||||
|
||||
`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or
|
||||
equal to zero (either dimension can be disabled with `0`). A returning subscriber
|
||||
that asks for events older than the oldest still-retained event is told it missed
|
||||
events (a "gap") and must re-snapshot; whatever is still retained is replayed.
|
||||
|
||||
## Dashboard Options
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ bearer). Each hub class is `[Authorize(Policy = HubClientsPolicy)]`.
|
||||
|---|---|---|---|---|
|
||||
| `DashboardSnapshotHub` | `/hubs/snapshot` | `DashboardSnapshotPublisher` (BackgroundService consuming `IDashboardSnapshotService.WatchSnapshotsAsync`) | `DashboardSnapshot` | Sent to all connected clients on every snapshot tick; new connections receive the current snapshot synchronously in `OnConnectedAsync`. |
|
||||
| `AlarmsHub` | `/hubs/alarms` | `AlarmsHubPublisher` (BackgroundService consuming `IGatewayAlarmService.StreamAsync(filter: null)`) | `AlarmFeedMessage` (`active_alarm` / `snapshot_complete` / `transition`) | Connected clients auto-join `__alarms__`; all clients receive every message. Publisher auto-reconnects every 5s on stream faults. |
|
||||
| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by `EventStreamService` for each event it forwards to a gRPC client | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. Events appear only while a gRPC client is also consuming that session's events — the dashboard is a passive mirror, not a separate worker subscriber. |
|
||||
| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by each session's internal dashboard-mirror subscriber on its `SessionEventDistributor` (registered when the session becomes Ready) | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. The dashboard is a first-class distributor subscriber, so it receives the session's events whether or not a gRPC client is streaming. It sees RAW session events — not the per-gRPC-subscriber `AfterWorkerSequence` filtering that `EventStreamService` applies at its own boundary — because the dashboard is a separate LDAP-authenticated monitoring view meant to show the session's full event activity (per-session dashboard ACL is tracked separately). |
|
||||
|
||||
`DashboardPageBase` opens a `DashboardSnapshotHub` connection via the connection
|
||||
factory in `OnInitializedAsync`, seeds `Snapshot` synchronously from
|
||||
@@ -184,7 +184,8 @@ Default cadences:
|
||||
- snapshot service produces one snapshot per
|
||||
`MxGateway:Dashboard:SnapshotIntervalMilliseconds` (default 1s);
|
||||
- alarm publisher emits on each transition observed by the central monitor;
|
||||
- event publisher emits per event forwarded by `StreamEvents`.
|
||||
- event publisher emits per event fanned by the session's `SessionEventDistributor`
|
||||
to its internal dashboard-mirror subscriber (independent of any gRPC `StreamEvents`).
|
||||
|
||||
Avoid pushing every MXAccess data-change event into a wider broadcast group.
|
||||
The current design routes events strictly through `session:{id}` groups; the
|
||||
|
||||
@@ -0,0 +1,233 @@
|
||||
# Session Resilience Epic — Design
|
||||
|
||||
**Date:** 2026-06-15
|
||||
**Branch:** `feat/session-resilience`
|
||||
**Source:** `stillpending.md` §2 (intentional v1-deferred items), scoped into a real feature design.
|
||||
**Status:** Design approved; implementation plan to follow.
|
||||
|
||||
## Goal
|
||||
|
||||
Lift four deliberately-deferred v1 limitations into supported features, built on
|
||||
one shared foundation:
|
||||
|
||||
1. **Multi-event-subscriber fan-out** (§2: plumbed but validator-blocked).
|
||||
2. **Reconnectable sessions** (§2: 1:1 session↔connection today).
|
||||
3. **Per-session dashboard ACL** (§2 / EventsHub `TODO(per-session-acl)`).
|
||||
4. **Orphan-worker reattach on gateway restart** (§2 — **overturns a hard
|
||||
CLAUDE.md rule**, see "Documented-rule changes").
|
||||
|
||||
These are not peers: fan-out is the keystone, reconnect and reattach reuse its
|
||||
machinery, and three of the four need a new session-ownership concept.
|
||||
|
||||
## Documented-rule changes (explicit, owner-approved)
|
||||
|
||||
This epic deliberately reverses three documented v1 decisions. Each reversal is a
|
||||
required deliverable in the same change as the code:
|
||||
|
||||
- **CLAUDE.md:77** "Gateway restart does not reattach orphan workers… do not
|
||||
design code paths that assume reattachment." → reattach becomes supported,
|
||||
bounded, and opt-in.
|
||||
- **`docs/DesignDecisions.md:63-73`** "no reconnectable sessions for v1." →
|
||||
reconnect becomes supported within a bounded detach-grace window.
|
||||
- **`docs/DesignDecisions.md:75-80`** single event subscriber per session. →
|
||||
multi-subscriber fan-out becomes supported, capped.
|
||||
|
||||
The owner explicitly accepted overturning the reattach rule during design.
|
||||
|
||||
## Current-state seams (verified by recon, with citations)
|
||||
|
||||
- `GatewaySession.AttachEventSubscriber(bool allowMultipleSubscribers)`
|
||||
(`Sessions/GatewaySession.cs:386-408`) guards on a single int
|
||||
`_activeEventSubscriberCount` (`:16`) under `_syncRoot`; a second subscriber
|
||||
throws `EventSubscriberAlreadyActive` (`:398`).
|
||||
- `GatewayOptionsValidator.cs:181-185` hard-rejects
|
||||
`AllowMultipleEventSubscribers` ("not supported until event fan-out is
|
||||
implemented"); option bound at `SessionOptions.cs:26-29`.
|
||||
- `EventStreamService.StreamEventsAsync` (`Grpc/EventStreamService.cs:27-101`)
|
||||
creates **a new bounded `Channel<MxEvent>` per RPC call** (`:43-50`) and
|
||||
`ProduceEventsAsync` drains `session.ReadEventsAsync()` directly — a
|
||||
**destructive, single-consumer drain**. Two RPCs would fight over one queue.
|
||||
- Backpressure: `ProduceEventsAsync` uses non-blocking `TryWrite`; on overflow
|
||||
with `EventBackpressurePolicy.FailFast` (default, `EventOptions`) it calls
|
||||
`session.MarkFaulted` (`EventStreamService.cs:143-162`) — faulting the **whole
|
||||
session**, not just the slow consumer.
|
||||
- `DashboardEventBroadcaster.Publish` (`Dashboard/Hubs/DashboardEventBroadcaster.cs:13-44`)
|
||||
is called **inside** the per-RPC producer loop (`EventStreamService.cs:131-141`)
|
||||
— so the dashboard only mirrors events while a gRPC client is streaming. Latent
|
||||
bug: no gRPC subscriber ⇒ dashboard feed is dark.
|
||||
- Pipe name `mxaccess-gateway-{Environment.ProcessId}-{sessionId}`
|
||||
(`SessionManager.cs:433`); session id `session-{Guid:N}` (`:479`), in-memory
|
||||
`SessionRegistry` only (`SessionRegistry.cs:12`), **not persisted**.
|
||||
- `OrphanWorkerTerminator` (`Workers/OrphanWorkerTerminator.cs:49-112`) discovers
|
||||
orphans by executable name/path (x64 gateway cannot introspect the x86 worker
|
||||
module → image-name fallback) and **terminates** them; rationale comment at
|
||||
`:9-16`.
|
||||
- Pipe fault → `WorkerClient` read loop detects `EndOfStream`, session →
|
||||
`Faulted` (`WorkerClient.cs:376-381`); no reattach. Worker launch passes the
|
||||
per-session nonce via `MXGATEWAY_WORKER_NONCE` env var
|
||||
(`WorkerProcessLauncher.cs:180-182`).
|
||||
- Sessions store `ClientIdentity` (informational only, `GatewaySession.cs:114`);
|
||||
**no `OwnerKeyId`, no per-session ACL.** gRPC `StreamEvents` enforces per-item
|
||||
read constraints but **no session-level access gate** — any caller who knows a
|
||||
session id can stream it.
|
||||
- `EventsHub.SubscribeSession(string)` (`Dashboard/Hubs/EventsHub.cs:46-54`) joins
|
||||
group `session:{id}`; only hub-level `[Authorize(HubClientsPolicy)]` gates it,
|
||||
so **any** Admin/Viewer can subscribe to **any** session. `TODO(per-session-acl)`
|
||||
at `:39-43`. `SnapshotHub`/`AlarmsHub` broadcast to all. Hub bearer
|
||||
(`HubTokenService`, 30-min) carries name + roles only, **no session scope**.
|
||||
- `StreamEventsRequest.AfterWorkerSequence` already exists on the wire (the
|
||||
reconnect replay contract is half-built).
|
||||
|
||||
## Shared foundation
|
||||
|
||||
### A. `SessionEventDistributor` (one pump, N per-subscriber channels)
|
||||
|
||||
Per `GatewaySession`, replace the per-RPC direct drain with a single owned
|
||||
distributor:
|
||||
|
||||
- One background **pump task** drains `ReadEventsAsync()` exactly once.
|
||||
- Each event is (1) stamped with its worker sequence, (2) appended to a **bounded
|
||||
replay ring buffer** (retain last `ReplayBufferCapacity` events or
|
||||
`ReplayRetentionSeconds`, whichever first), and (3) `TryWrite`-fanned to every
|
||||
registered subscriber's own bounded `Channel<MxEvent>`.
|
||||
- **Per-subscriber backpressure isolation:** overflow completes only that
|
||||
subscriber's channel (policy `DisconnectSubscriber`); the session and peers are
|
||||
untouched. `FailFast`→`MarkFaulted` is retained only for the legacy
|
||||
single-subscriber config path, for backward compatibility.
|
||||
- **Constraint filtering stays per-subscriber:** the pump fans *raw* events; each
|
||||
subscriber's read loop applies its own API-key read subtree/glob filter exactly
|
||||
as today. No change to constraint semantics.
|
||||
- `AttachEventSubscriber` returns a lease carrying that subscriber's channel
|
||||
reader + its start sequence (for replay). `EventStreamService` reads the lease
|
||||
channel instead of creating its own channel and draining the session.
|
||||
|
||||
### B. Session ownership
|
||||
|
||||
Record an authoritative **`OwnerKeyId`** (the creating API key id) on the session
|
||||
at `OpenSession`, alongside the existing informational `ClientIdentity`. This one
|
||||
field underpins ACL, reconnect re-validation, and reattach adoption.
|
||||
|
||||
## Feature designs
|
||||
|
||||
### 1. Multi-subscriber fan-out
|
||||
|
||||
- Remove the `GatewayOptionsValidator.cs:181-185` rejection; keep the option but
|
||||
allow `true`.
|
||||
- `_activeEventSubscriberCount` → a subscriber-lease collection on the
|
||||
distributor. New cap `MaxEventSubscribersPerSession` (default 8) → reject the
|
||||
N+1 attach with `EventSubscriberLimitReached`.
|
||||
- Dashboard broadcaster registers as a distributor subscriber (removing the inline
|
||||
tap), fixing the dashboard-dark-without-gRPC bug.
|
||||
- **No proto change.**
|
||||
|
||||
### 2. Reconnectable sessions
|
||||
|
||||
- On stream drop, a session in **detach-grace** mode is retained (not closed) for
|
||||
`DetachGraceSeconds` (separate from the session lease). New session
|
||||
disconnect-policy value `DetachGrace`.
|
||||
- On reconnect: client calls `StreamEvents` with the same session id +
|
||||
`AfterWorkerSequence = lastSeen`. The distributor replays ring-buffer events
|
||||
with `sequence > AfterWorkerSequence`, then resumes live.
|
||||
- If the requested sequence is older than the ring's oldest retained event (gone
|
||||
too long / ring overflowed), the server signals **`ReplayGap`** so the client
|
||||
re-snapshots. **Contract addition** (a `ReplayGap` status / response marker) →
|
||||
codegen ripple across all 5 clients.
|
||||
- Reconnect re-validates caller `OwnerKeyId` == session owner → else
|
||||
`PermissionDenied`.
|
||||
|
||||
### 3. Per-session ACL
|
||||
|
||||
- **gRPC (real security win, no proto change):** `Invoke` / `StreamEvents` /
|
||||
`CloseSession` gated to the owning API key, OR a key holding a new
|
||||
all-sessions admin scope → else `PermissionDenied`. Enforced in
|
||||
`MxAccessGatewayService` against session `OwnerKeyId`.
|
||||
- **Dashboard:** identity-domain mismatch (LDAP Admin/Viewer users vs API-key
|
||||
sessions) means no natural owner link.
|
||||
- **Decision required (flagged, not hard-coded):** default proposal —
|
||||
**Admin** sees all sessions; **Viewer** scoped via config
|
||||
`Dashboard:GroupToSessionTag` matched against an optional session `Tag`.
|
||||
Enforced at `EventsHub.SubscribeSession` and in the `/hubs/token` mint
|
||||
(token gains an allowed-session-tag claim). The owner may instead choose a
|
||||
strict default (Viewers see nothing unless granted).
|
||||
|
||||
### 4. Orphan-worker reattach
|
||||
|
||||
- **Stable pipe naming:** drop `{gatewayPid}`; use a persisted stable
|
||||
gateway-instance id. Replaces the pid's collision-avoidance role.
|
||||
- **Adoption manifest:** persist a minimal record per live session
|
||||
(`sessionId → workerPid, nonce, ownerKeyId, pipeName`) in the existing SQLite
|
||||
store. This is the *only* persisted session state; COM/advise state stays in
|
||||
the worker.
|
||||
- **Worker phones home:** the worker runs a reconnect loop with bounded backoff;
|
||||
the restarted gateway re-opens pipe servers for manifest entries and the
|
||||
surviving worker re-attaches, presenting its **nonce**. Gateway validates the
|
||||
nonce against the manifest and **rejects impostors / foreign workers**.
|
||||
- **Resync, not replay:** the in-memory ring buffer is lost on restart, so a
|
||||
reattached session's subscribers get `ReplayGap` and re-snapshot. Gateway
|
||||
resyncs worker view via the now-implemented `GetSessionState` / `GetWorkerInfo`
|
||||
commands.
|
||||
- **Safety net retained:** workers self-terminate after `MaxOrphanLifetime` with
|
||||
no re-adoption; `OrphanWorkerTerminator` stays as the fallback for un-adoptable
|
||||
or foreign workers. Reattach is opt-in (`Workers:EnableOrphanReattach`,
|
||||
default off) so the documented-safe behavior remains the default.
|
||||
- **Pipe protocol:** add an **adopt/reconnect frame** to `mxaccess_worker.proto`
|
||||
→ worker codegen regen + commit `Generated/` (net48 regen rule applies).
|
||||
|
||||
## Contract / codegen impact
|
||||
|
||||
Unlike the prior epic, this is **not zero-proto**:
|
||||
|
||||
- `mxaccess_gateway.proto` — `ReplayGap` signal for reconnect (Feature 2).
|
||||
- `mxaccess_worker.proto` — adopt/reconnect frame (Feature 4).
|
||||
|
||||
Per the repo rule: regenerate `Generated/`, commit it, rebuild gateway + worker +
|
||||
every generated client touched, and update affected docs in the same change.
|
||||
|
||||
## Error handling
|
||||
|
||||
- Per-subscriber overflow → disconnect that subscriber only; session survives.
|
||||
- Reconnect past the ring horizon → `ReplayGap`, client re-snapshots (no silent
|
||||
loss).
|
||||
- Reattach nonce mismatch → reject + fall back to termination.
|
||||
- ACL denial → `PermissionDenied` (gRPC) / hub subscribe refused (dashboard).
|
||||
- All worker COM/STA interactions keep MXAccess parity — no synthesized events,
|
||||
no "fixing" surprising returns.
|
||||
|
||||
## Testing & cross-platform verification
|
||||
|
||||
| Area | Test | Host |
|
||||
|---|---|---|
|
||||
| Distributor fan-out, per-sub backpressure, replay ring | unit, `FakeWorkerHarness` | local (macOS) |
|
||||
| Reconnect replay + `ReplayGap` | unit + fake-worker integration | local |
|
||||
| Session ownership / gRPC ACL | unit + gateway integration | local |
|
||||
| Dashboard per-session ACL | LDAP test users (`multi-role`/`gw-viewer`) | local + live LDAP |
|
||||
| Worker adopt frame, reattach handshake | worker unit (net48/x86) | **windev** |
|
||||
| Gateway-restart reattach round-trip | integration | **windev** + live worker |
|
||||
| Client `ReplayGap` handling | per-client tests; Java on macOS JDK 21 | local |
|
||||
|
||||
TDD throughout; per-task commits; `Generated/` regenerated+committed on proto
|
||||
changes; docs (incl. the three documented-rule reversals) updated in the same
|
||||
change as source.
|
||||
|
||||
## Delivery order (dependency stack)
|
||||
|
||||
Each phase is independently shippable:
|
||||
|
||||
1. **Foundation** — `SessionEventDistributor` + replay ring + session `OwnerKeyId`
|
||||
(refactor with no external behavior change; dashboard-dark bug fixed).
|
||||
2. **Fan-out** — remove validator block, subscriber-lease list, cap, dashboard as
|
||||
subscriber.
|
||||
3. **Reconnect** — detach-grace, replay-on-reconnect, `ReplayGap` contract +
|
||||
client handling.
|
||||
4. **Per-session ACL** — gRPC owner gate + dashboard scoping.
|
||||
5. **Reattach** — stable pipe naming, adoption manifest, worker phone-home +
|
||||
adopt frame, resync, safety net; documented-rule reversals.
|
||||
|
||||
## Out of scope
|
||||
|
||||
- Cross-gateway / clustered session sharing (single gateway instance only).
|
||||
- Event persistence beyond the in-memory ring (no durable event log).
|
||||
- Reconnect across a gateway *restart* with zero event gap (restart always yields
|
||||
`ReplayGap` by design — the ring is in-memory).
|
||||
- Per-session ACL on `SnapshotHub` / `AlarmsHub` (they broadcast aggregate state;
|
||||
only `EventsHub` is session-scoped).
|
||||
@@ -0,0 +1,417 @@
|
||||
# Session Resilience Epic — Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (same session) or executing-plans (parallel session) to implement this plan task-by-task.
|
||||
|
||||
**Goal:** Lift four deferred v1 limitations — multi-subscriber fan-out, reconnectable sessions, per-session ACL, orphan-worker reattach — onto one shared event-distribution foundation.
|
||||
|
||||
**Architecture:** A per-session `SessionEventDistributor` (one pump → N per-subscriber bounded channels + a bounded replay ring) replaces today's per-RPC destructive drain. Session ownership (`OwnerKeyId`) underpins ACL, reconnect re-validation, and reattach adoption. See `docs/plans/2026-06-15-session-resilience-design.md`.
|
||||
|
||||
**Tech Stack:** .NET 10 gateway (x64), .NET Framework 4.8 worker (x86, windev), SQLite auth/manifest store, gRPC + protobuf contracts (net10.0;net48), 5 language clients, Blazor/SignalR dashboard, LDAP dashboard auth.
|
||||
|
||||
**Cross-platform:** Gateway, dotnet/Go/Rust/Python clients, and the Java client build/test locally on macOS (JDK 21 at `~/.local/jdks/jdk-21.0.11+10/Contents/Home`). The net48/x86 worker and worker tests build/test on **windev** (ssh alias, PowerShell). Proto changes: regenerate `Generated/`, commit it, rebuild every touched component.
|
||||
|
||||
**Standing rules (from CLAUDE.md):** never log secrets/credentials/values; MXAccess parity (no synthesized events, no "fixing" surprising returns); no init-only props/positional records in net48 worker; update docs in the same change as source; branch already created (`feat/session-resilience`); per-task commits; build+test affected components before marking done.
|
||||
|
||||
---
|
||||
|
||||
## Phase 1 — Foundation (refactor; no external behavior change except the dashboard-dark fix)
|
||||
|
||||
### Task 1: Add `OwnerKeyId` to the session
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none (other phase-1 tasks build on the session type)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (add `OwnerKeyId` readonly prop near `ClientIdentity:114`)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs` (set `OwnerKeyId` from the request identity at `OpenSession`, near `CreateSessionId:479`)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs`
|
||||
|
||||
**Steps:** TDD — failing test asserting an opened session records the creating API key id → add the property + assignment from `IGatewayRequestIdentityAccessor.Current` → green → `dotnet build src/ZB.MOM.WW.MxGateway.Server` + run session tests → commit.
|
||||
|
||||
### Task 2: `SessionEventDistributor` skeleton (single pump, subscriber registry)
|
||||
|
||||
**Classification:** high-risk (concurrency / actor model)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs`
|
||||
|
||||
**Design:** One background pump `Task` draining `session.ReadEventsAsync()` exactly once; a thread-safe subscriber collection where each subscriber owns a bounded `Channel<MxEvent>` (`SingleReader=true`, `FullMode=Wait` for the per-sub channel, but writes use non-blocking `TryWrite`). `Register(startSequence)` returns a lease (channel reader + dispose). Pump fans each drained event to all subscriber channels via `TryWrite`.
|
||||
|
||||
**Steps:** Failing test: two registered subscribers both receive the same fanned event; disposing one stops its delivery without affecting the other. Implement pump + registry. Green. Build + test. Commit.
|
||||
|
||||
### Task 3: Bounded replay ring buffer
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none (extends Task 2)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs` (add `ReplayBufferCapacity`, `ReplayRetentionSeconds`)
|
||||
- Test: `SessionEventDistributorTests.cs`
|
||||
|
||||
**Design:** Append each fanned event to a ring keyed by worker sequence, evicting by count (`ReplayBufferCapacity`) or age (`ReplayRetentionSeconds`), whichever first. Expose `TryGetReplayFrom(afterSequence, out events, out gap)`.
|
||||
|
||||
**Steps:** Failing test: events evicted past capacity; `TryGetReplayFrom` returns `gap=true` when requested sequence is older than the oldest retained. Implement. Green. Build+test. Commit.
|
||||
|
||||
### Task 4: Rewire `AttachEventSubscriber` + `EventStreamService` onto the distributor
|
||||
|
||||
**Classification:** high-risk (changes the live event path)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs:386-408` (own a `SessionEventDistributor`; `AttachEventSubscriber` returns a lease wrapping `distributor.Register(...)`)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:27-101` (read the lease's channel instead of creating a per-RPC channel and draining the session directly; remove the per-RPC `Channel.CreateBounded` at `:43-50`)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs`
|
||||
|
||||
**Steps:** Failing test: a single subscriber still streams events end-to-end through the distributor (regression parity with today). Rewire. Keep per-item constraint filtering in the subscriber read loop. Green. Build + run gateway event-stream tests. Commit.
|
||||
|
||||
### Task 5: Per-subscriber backpressure isolation
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none (extends Tasks 2/4)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (overflow path `:143-162`)
|
||||
- Test: `SessionEventDistributorTests.cs`
|
||||
|
||||
**Design:** On a subscriber channel `TryWrite` failure, complete only that subscriber's channel with `EventQueueOverflow` (policy `DisconnectSubscriber`). Retain `FailFast`→`MarkFaulted` only when the session is in legacy single-subscriber mode (back-compat).
|
||||
|
||||
**Steps:** Failing test: a slow subscriber overflows and is disconnected while a second subscriber keeps receiving and the session stays `Ready`. Implement. Green. Build+test. Commit.
|
||||
|
||||
### Task 6: Dashboard broadcaster becomes a distributor subscriber
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:131-141` (remove the inline `dashboardEventBroadcaster.Publish` tap)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (register the dashboard broadcaster as a distributor subscriber on session start)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs` (consume from a distributor lease)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardEventBroadcasterTests.cs`
|
||||
|
||||
**Steps:** Failing test: dashboard receives session events even with **no** active gRPC subscriber (fixes the latent dark-feed bug). Implement. Green. Build + dashboard tests. Commit.
|
||||
|
||||
---
|
||||
|
||||
## Phase 2 — Multi-subscriber fan-out
|
||||
|
||||
### Task 7: Remove the validator block + add the subscriber cap option
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** Task 8 is sequential (same files); none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs:181-185` (delete the rejection)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs` (add `MaxEventSubscribersPerSession`, default 8)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Configuration/GatewayOptionsValidatorTests.cs`
|
||||
|
||||
**Steps:** Failing test: `AllowMultipleEventSubscribers=true` now validates clean. Remove rule, add option. Green. Build+test. Commit.
|
||||
|
||||
### Task 8: Subscriber-lease collection + cap enforcement
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (replace `_activeEventSubscriberCount:16` with a lease collection; honor `allowMultipleSubscribers`; reject N+1 with new `SessionManagerErrorCode.EventSubscriberLimitReached`)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs`
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs`
|
||||
|
||||
**Steps:** Failing tests: N subscribers attach concurrently up to the cap; N+1 throws `EventSubscriberLimitReached`; single-subscriber mode still rejects the 2nd. Implement. Green. Build+test. Commit.
|
||||
|
||||
### Task 9: Multi-subscriber end-to-end test via FakeWorkerHarness
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs`
|
||||
|
||||
**Steps:** Two concurrent `StreamEvents` RPCs on one session both receive every worker event; one cancels, the other continues. Build + full fake-worker suite. Commit.
|
||||
|
||||
---
|
||||
|
||||
## Phase 3 — Reconnectable sessions
|
||||
|
||||
### Task 10: Proto — `ReplayGap` signal (contract change)
|
||||
|
||||
**Classification:** high-risk (contracts → all clients)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Contracts/Protos/mxaccess_gateway.proto` (add a `ReplayGap` marker — a `replay_gap` bool + `oldest_available_sequence` on the stream response, or a dedicated leading status frame)
|
||||
- Regenerate: `dotnet build src/ZB.MOM.WW.MxGateway.Contracts/ZB.MOM.WW.MxGateway.Contracts.csproj`; **commit** `src/ZB.MOM.WW.MxGateway.Contracts/Generated/*` (net48 regen rule — see `project_proto_codegen_regen`)
|
||||
- Test: contracts build both TFMs (net10.0;net48)
|
||||
|
||||
**Steps:** Add field(s), regen, `del Generated/*.cs` if needed to force regen, commit generated. Build contracts both TFMs. Commit. **This unblocks Task 11 and Task 14.**
|
||||
|
||||
### Task 11: Detach-grace session retention
|
||||
|
||||
**Classification:** high-risk (session lifecycle)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (add `DetachGrace` retention: on last-subscriber-drop, keep session alive for `DetachGraceSeconds` instead of closing)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs` (`DetachGraceSeconds`; new disconnect-policy value `DetachGrace`)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionLeaseMonitorHostedService.cs` (sweep expired detach-grace windows)
|
||||
- Test: `GatewaySessionTests.cs`
|
||||
|
||||
**Steps:** Failing test: subscriber drop under `DetachGrace` keeps the session `Ready` until the window expires, then closes. Implement. Green. Build + session/lease tests. Commit.
|
||||
|
||||
### Task 12: Replay-on-reconnect + emit `ReplayGap`
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (on attach with `AfterWorkerSequence`, call `distributor.TryGetReplayFrom`; replay buffered events then resume live; if `gap`, send the `ReplayGap` marker first)
|
||||
- Test: `EventStreamServiceTests.cs`
|
||||
|
||||
**Steps:** Failing tests: reconnect with a known sequence replays only newer events; reconnect past the ring horizon yields `ReplayGap`. Implement. Green. Build + test. Commit.
|
||||
|
||||
### Task 13: Owner re-validation on reconnect
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** Task 12 (different assertion in same service — sequence after 12)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (reconnect requires caller `OwnerKeyId` == session owner → `PermissionDenied`)
|
||||
- Test: `EventStreamServiceTests.cs`
|
||||
|
||||
**Steps:** Failing test: a different API key cannot resume someone else's session. Implement. Green. Build+test. Commit.
|
||||
|
||||
### Task 14: Client `ReplayGap` handling — all 5 clients
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~5 min each (dispatch as 5 parallel sub-tasks; disjoint files)
|
||||
**Parallelizable with:** each other (14a–14e)
|
||||
|
||||
**Files (one client each):**
|
||||
- 14a dotnet: `clients/dotnet/.../` stream consumer + test
|
||||
- 14b Go: `clients/go/mxgateway/` + `go test`
|
||||
- 14c Python: `clients/python/src/.../` + `pytest`
|
||||
- 14d Rust: `clients/rust/crates/.../` + `cargo test`/clippy
|
||||
- 14e Java: `clients/java/.../` + `gradle test` (macOS JDK 21; **revert generated `MxaccessGateway.java` churn** per `project_java_generated_churn`)
|
||||
|
||||
**Steps (each):** Regenerate client stubs from the updated proto; surface `ReplayGap` to the caller (callback/return marker) so apps know to re-snapshot; test the gap path. Build+test that client. Commit per client.
|
||||
|
||||
### Task 15: Reconnect integration test (fake worker)
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs`
|
||||
|
||||
**Steps:** Stream, drop, reconnect within grace with last sequence → no gap; reconnect after ring overflow → `ReplayGap`. Build + suite. Commit.
|
||||
|
||||
---
|
||||
|
||||
## Phase 4 — Per-session ACL
|
||||
|
||||
### Task 16: gRPC session-owner gate + all-sessions admin scope
|
||||
|
||||
**Classification:** high-risk (security)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs` (`Invoke`/`StreamEvents`/`CloseSession` require caller key == session `OwnerKeyId`, or a key bearing a new all-sessions scope)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Security/Authorization/` (define the all-sessions scope; map it)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs`
|
||||
|
||||
**Steps:** Failing tests: foreign key gets `PermissionDenied` on another key's session; owner and all-sessions-scoped key succeed. Implement. Green. Build + gateway tests. Commit.
|
||||
|
||||
### Task 17: Session `Tag` + dashboard group→tag config
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** Task 16 (disjoint files)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (+`SessionManager` to set an optional `Tag` from the open request)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/` Dashboard options (`GroupToSessionTag` map)
|
||||
- Test: config-binding test
|
||||
|
||||
**Steps:** Failing test: a session carries its tag; config map binds. Implement. Green. Build+test. Commit.
|
||||
|
||||
### Task 18: EventsHub per-session ACL + hub-token session-tag claim
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none (depends on 17)
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs:39-54` (replace `TODO(per-session-acl)`: Admin sees all; Viewer allowed only if the session's tag is in the user's `GroupToSessionTag`-derived allowed set)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs` (mint an allowed-session-tag claim)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenAuthenticationHandler.cs` (carry the claim back)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/EventsHubTests.cs`
|
||||
|
||||
> **Decision flagged in the design doc:** default is "Admin all / Viewer by tag map." If the owner chose the strict variant (Viewers see nothing unless granted), invert the default here — the executor must confirm which before implementing.
|
||||
|
||||
**Steps:** Failing tests: Viewer without the tag is refused `SubscribeSession`; Admin allowed; Viewer with the mapped tag allowed. Implement. Green. Build + dashboard tests. Commit.
|
||||
|
||||
### Task 19: ACL tests incl. live LDAP users
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` (extend; gated `MXGATEWAY_RUN_LIVE_LDAP_TESTS=1`)
|
||||
|
||||
**Steps:** With `multi-role` (Admin) vs `gw-viewer` (Viewer), assert subscribe authorization differs by session tag. Document if skipped (no live LDAP). Commit.
|
||||
|
||||
---
|
||||
|
||||
## Phase 5 — Orphan-worker reattach (overturns the CLAUDE.md rule)
|
||||
|
||||
### Task 20: Stable gateway-instance id + stable pipe naming
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs:433` (pipe name uses a persisted stable gateway-instance id instead of `Environment.ProcessId`)
|
||||
- Create: gateway-instance-id persistence (small file/SQLite row under `C:\ProgramData\MxGateway\`)
|
||||
- Test: `SessionManagerTests.cs` / a new instance-id test
|
||||
|
||||
**Steps:** Failing test: pipe name is stable across simulated restarts (same instance id). Implement. Green. Build + tests. Commit.
|
||||
|
||||
### Task 21: Adoption manifest store (SQLite)
|
||||
|
||||
**Classification:** high-risk (persistence, security material)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerAdoptionManifest.cs` (persist `sessionId → workerPid, nonce, ownerKeyId, pipeName`; upsert on launch, delete on clean close)
|
||||
- Modify: gateway-auth SQLite schema/migration
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerAdoptionManifestTests.cs`
|
||||
|
||||
> Nonce is security material — store it like other secrets (no plaintext logging; standing rule).
|
||||
|
||||
**Steps:** Failing test: manifest round-trips an entry; clean close removes it. Implement. Green. Build + tests. Commit.
|
||||
|
||||
### Task 22: Proto — worker adopt/reconnect frame (contract change)
|
||||
|
||||
**Classification:** high-risk (contracts → worker)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Contracts/Protos/mxaccess_worker.proto` (add an adopt/reconnect `WorkerEnvelope` frame: worker presents `sessionId` + `nonce`; gateway ACK/NACK)
|
||||
- Regenerate + **commit** `Generated/*` (net48 rule)
|
||||
- Test: contracts build both TFMs
|
||||
|
||||
**Steps:** Add frame, regen, commit generated, build both TFMs. Commit. **Unblocks Tasks 24–25.**
|
||||
|
||||
### Task 23: Worker phone-home reconnect loop + self-terminate
|
||||
|
||||
**Classification:** high-risk (worker, net48/x86 — **windev**)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeClient.cs` (on pipe drop: reconnect loop with bounded backoff to the stable pipe name; present the adopt frame)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Worker/` runtime (self-terminate after `MaxOrphanLifetime` with no adoption)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Worker.Tests/` (net48/x86 on windev)
|
||||
|
||||
**Steps:** Failing test (fake pipe server): worker retries and adopts; gives up + self-terminates past the lifetime. Build x86 + worker tests on **windev**. Commit. *(net48: no init-only/positional records.)*
|
||||
|
||||
### Task 24: Gateway adoption — re-open pipes, nonce-validate, reject impostors
|
||||
|
||||
**Classification:** high-risk (security, lifecycle — **windev** for live)
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Create: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerAdopter.cs` (startup: read manifest, re-open pipe servers, accept adopt frames, validate nonce → adopt or reject)
|
||||
- Modify: gateway startup hosted-service order (adopter runs **before** `OrphanWorkerTerminator`; terminator handles only un-adoptable/foreign workers)
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/OrphanWorkerAdopterTests.cs`
|
||||
|
||||
**Steps:** Failing tests: matching nonce adopts and rebuilds the session; mismatched nonce is rejected and the worker terminated. Implement. Green. Build + tests. Commit.
|
||||
|
||||
### Task 25: Resync adopted worker + `ReplayGap` to subscribers
|
||||
|
||||
**Classification:** standard
|
||||
**Estimated implement time:** ~4 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerAdopter.cs` (after adoption, `GetSessionState`/`GetWorkerInfo` to resync; reattached subscribers get `ReplayGap` since the ring is gone)
|
||||
- Test: `OrphanWorkerAdopterTests.cs`
|
||||
|
||||
**Steps:** Failing test: adopted session reports resynced state; a resuming subscriber receives `ReplayGap`. Implement. Green. Build + tests. Commit.
|
||||
|
||||
### Task 26: `EnableOrphanReattach` flag (default off) + terminator fallback
|
||||
|
||||
**Classification:** small
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/WorkerOptions.cs` (`EnableOrphanReattach`, default `false`)
|
||||
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerTerminator.cs` (unchanged default behavior when reattach disabled)
|
||||
- Test: `OrphanWorkerTerminatorTests.cs` / adopter test
|
||||
|
||||
**Steps:** Failing test: with the flag off, startup terminates (today's behavior); on, it adopts. Implement. Green. Build + tests. Commit.
|
||||
|
||||
### Task 27: Gateway-restart reattach round-trip (integration, **windev** + live worker)
|
||||
|
||||
**Classification:** high-risk
|
||||
**Estimated implement time:** ~5 min
|
||||
**Parallelizable with:** none
|
||||
|
||||
**Files:**
|
||||
- Test: `src/ZB.MOM.WW.MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` (gated `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1`)
|
||||
|
||||
**Steps:** Open session → simulate gateway restart → adopter re-adopts the surviving worker → session usable → subscriber gets `ReplayGap` then live events. Run on **windev** with live MXAccess. Document if skipped.
|
||||
|
||||
### Task 28: Documented-rule reversals + stillpending refresh
|
||||
|
||||
**Classification:** trivial (doc-only)
|
||||
**Estimated implement time:** ~3 min
|
||||
**Parallelizable with:** none (final)
|
||||
|
||||
**Files:**
|
||||
- Modify: `CLAUDE.md` (line ~77 — reattach now supported, opt-in/bounded)
|
||||
- Modify: `docs/DesignDecisions.md` (`:63-73` reconnect, `:75-80` multi-subscriber, reattach rationale → mark superseded with this design's date/commit)
|
||||
- Modify: `gateway.md` (post-v1 revisit items — reflect what shipped)
|
||||
- Modify: `stillpending.md` (§2 items: mark fan-out/reconnect/ACL/reattach Resolved with commit refs)
|
||||
- Modify: `docs/GatewayConfiguration.md` (new options: `MaxEventSubscribersPerSession`, `ReplayBufferCapacity`, `ReplayRetentionSeconds`, `DetachGraceSeconds`, `GroupToSessionTag`, `EnableOrphanReattach`, `MaxOrphanLifetime`)
|
||||
|
||||
**Steps:** Edit docs to match shipped behavior. Commit.
|
||||
|
||||
---
|
||||
|
||||
## Verification matrix
|
||||
|
||||
| Phase | Build/test | Host |
|
||||
|---|---|---|
|
||||
| 1–4 (gateway, clients) | `dotnet build` + gateway/fake-worker tests; per-client `go/pytest/cargo/gradle/dotnet test` | local (macOS) |
|
||||
| 3/5 proto changes | regen + commit `Generated/`; build contracts both TFMs; rebuild touched clients | local |
|
||||
| 5 worker (net48/x86) | `dotnet build -p:Platform=x86` + `Worker.Tests` | **windev** |
|
||||
| 5 live reattach + Phase-4 LDAP | opt-in gated integration tests | **windev** / live LDAP |
|
||||
|
||||
## Final integration review
|
||||
|
||||
After all tasks: dispatch a final integration reviewer over `git diff main..HEAD` focusing on the live event path, concurrency in `SessionEventDistributor`, security gates (ACL + nonce adoption), and the three documented-rule reversals. Then use superpowers-extended-cc:finishing-a-development-branch.
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"planPath": "docs/plans/2026-06-15-session-resilience.md",
|
||||
"tasks": [
|
||||
{"id": 108, "subject": "Task 1: Add OwnerKeyId to the session", "status": "pending"},
|
||||
{"id": 109, "subject": "Task 2: SessionEventDistributor skeleton", "status": "pending", "blockedBy": [108]},
|
||||
{"id": 110, "subject": "Task 3: Bounded replay ring buffer", "status": "pending", "blockedBy": [109]},
|
||||
{"id": 111, "subject": "Task 4: Rewire AttachEventSubscriber + EventStreamService onto distributor", "status": "pending", "blockedBy": [110]},
|
||||
{"id": 112, "subject": "Task 5: Per-subscriber backpressure isolation", "status": "pending", "blockedBy": [111]},
|
||||
{"id": 113, "subject": "Task 6: Dashboard broadcaster becomes a distributor subscriber", "status": "pending", "blockedBy": [111]},
|
||||
{"id": 114, "subject": "Task 7: Remove validator block + add subscriber cap option", "status": "pending", "blockedBy": [112]},
|
||||
{"id": 115, "subject": "Task 8: Subscriber-lease collection + cap enforcement", "status": "pending", "blockedBy": [114]},
|
||||
{"id": 116, "subject": "Task 9: Multi-subscriber end-to-end test (FakeWorkerHarness)", "status": "pending", "blockedBy": [115]},
|
||||
{"id": 117, "subject": "Task 10: Proto - ReplayGap signal", "status": "pending", "blockedBy": [116]},
|
||||
{"id": 118, "subject": "Task 11: Detach-grace session retention", "status": "pending", "blockedBy": [117]},
|
||||
{"id": 119, "subject": "Task 12: Replay-on-reconnect + emit ReplayGap", "status": "pending", "blockedBy": [118, 110]},
|
||||
{"id": 120, "subject": "Task 13: Owner re-validation on reconnect", "status": "pending", "blockedBy": [119, 108]},
|
||||
{"id": 121, "subject": "Task 14: Client ReplayGap handling - all 5 clients", "status": "pending", "blockedBy": [117]},
|
||||
{"id": 122, "subject": "Task 15: Reconnect integration test (fake worker)", "status": "pending", "blockedBy": [119]},
|
||||
{"id": 123, "subject": "Task 16: gRPC session-owner gate + all-sessions admin scope", "status": "pending", "blockedBy": [116, 108]},
|
||||
{"id": 124, "subject": "Task 17: Session Tag + dashboard group-to-tag config", "status": "pending", "blockedBy": [116]},
|
||||
{"id": 125, "subject": "Task 18: EventsHub per-session ACL + hub-token tag claim", "status": "pending", "blockedBy": [124]},
|
||||
{"id": 126, "subject": "Task 19: ACL tests incl. live LDAP users", "status": "pending", "blockedBy": [125]},
|
||||
{"id": 127, "subject": "Task 20: Stable gateway-instance id + stable pipe naming", "status": "pending", "blockedBy": [126]},
|
||||
{"id": 128, "subject": "Task 21: Adoption manifest store (SQLite)", "status": "pending", "blockedBy": [127]},
|
||||
{"id": 129, "subject": "Task 22: Proto - worker adopt/reconnect frame", "status": "pending", "blockedBy": [128]},
|
||||
{"id": 130, "subject": "Task 23: Worker phone-home reconnect loop + self-terminate", "status": "pending", "blockedBy": [129]},
|
||||
{"id": 131, "subject": "Task 24: Gateway adoption - re-open pipes, nonce-validate, reject impostors", "status": "pending", "blockedBy": [130]},
|
||||
{"id": 132, "subject": "Task 25: Resync adopted worker + ReplayGap to subscribers", "status": "pending", "blockedBy": [131, 119]},
|
||||
{"id": 133, "subject": "Task 26: EnableOrphanReattach flag (default off) + terminator fallback", "status": "pending", "blockedBy": [131]},
|
||||
{"id": 134, "subject": "Task 27: Gateway-restart reattach round-trip (WINDEV + live worker)", "status": "pending", "blockedBy": [132, 133]},
|
||||
{"id": 135, "subject": "Task 28: Documented-rule reversals + stillpending refresh", "status": "pending", "blockedBy": [134]}
|
||||
],
|
||||
"lastUpdated": "2026-06-15"
|
||||
}
|
||||
@@ -203,6 +203,7 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
GatewaySession session = await _sessionManager.OpenSessionAsync(
|
||||
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
|
||||
MonitorClientName,
|
||||
ownerKeyId: null,
|
||||
stoppingToken)
|
||||
.ConfigureAwait(false);
|
||||
lock (_sync) { _session = session; }
|
||||
|
||||
@@ -2,4 +2,6 @@ namespace ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
|
||||
public sealed record EffectiveEventConfiguration(
|
||||
int QueueCapacity,
|
||||
string BackpressurePolicy);
|
||||
string BackpressurePolicy,
|
||||
int ReplayBufferCapacity,
|
||||
double ReplayRetentionSeconds);
|
||||
|
||||
@@ -6,4 +6,5 @@ public sealed record EffectiveSessionConfiguration(
|
||||
int MaxPendingCommandsPerSession,
|
||||
int DefaultLeaseSeconds,
|
||||
int LeaseSweepIntervalSeconds,
|
||||
bool AllowMultipleEventSubscribers);
|
||||
bool AllowMultipleEventSubscribers,
|
||||
int MaxEventSubscribersPerSession);
|
||||
|
||||
@@ -11,4 +11,20 @@ public sealed class EventOptions
|
||||
/// Gets the backpressure policy for event queue overflow.
|
||||
/// </summary>
|
||||
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the maximum number of events retained in the per-session replay ring buffer
|
||||
/// used to re-deliver events a returning subscriber missed (reconnect/reattach).
|
||||
/// When the buffer exceeds this count the oldest retained events are evicted first.
|
||||
/// A value of <c>0</c> disables replay retention entirely.
|
||||
/// </summary>
|
||||
public int ReplayBufferCapacity { get; init; } = 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Gets the maximum age, in seconds, of an event retained in the per-session replay
|
||||
/// ring buffer. Entries older than this are evicted regardless of capacity. A value
|
||||
/// of <c>0</c> disables age-based eviction (only <see cref="ReplayBufferCapacity"/>
|
||||
/// bounds the buffer).
|
||||
/// </summary>
|
||||
public double ReplayRetentionSeconds { get; init; } = 300;
|
||||
}
|
||||
|
||||
@@ -46,10 +46,13 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
|
||||
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
|
||||
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
|
||||
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
|
||||
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
|
||||
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
|
||||
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
|
||||
Events: new EffectiveEventConfiguration(
|
||||
QueueCapacity: value.Events.QueueCapacity,
|
||||
BackpressurePolicy: value.Events.BackpressurePolicy.ToString()),
|
||||
BackpressurePolicy: value.Events.BackpressurePolicy.ToString(),
|
||||
ReplayBufferCapacity: value.Events.ReplayBufferCapacity,
|
||||
ReplayRetentionSeconds: value.Events.ReplayRetentionSeconds),
|
||||
Dashboard: new EffectiveDashboardConfiguration(
|
||||
Enabled: value.Dashboard.Enabled,
|
||||
AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost,
|
||||
|
||||
@@ -177,12 +177,10 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
|
||||
options.LeaseSweepIntervalSeconds,
|
||||
"MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.",
|
||||
builder);
|
||||
|
||||
if (options.AllowMultipleEventSubscribers)
|
||||
{
|
||||
builder.Add(
|
||||
"MxGateway:Sessions:AllowMultipleEventSubscribers is not supported until event fan-out is implemented.");
|
||||
}
|
||||
AddIfNotPositive(
|
||||
options.MaxEventSubscribersPerSession,
|
||||
"MxGateway:Sessions:MaxEventSubscribersPerSession must be greater than zero.",
|
||||
builder);
|
||||
}
|
||||
|
||||
private static void ValidateEvents(EventOptions options, ValidationBuilder builder)
|
||||
@@ -193,6 +191,16 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
|
||||
{
|
||||
builder.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
|
||||
}
|
||||
|
||||
// ReplayBufferCapacity and ReplayRetentionSeconds are bounds on the replay ring
|
||||
// buffer; 0 is a valid value (disables that dimension), so only negatives fail.
|
||||
AddIfNegative(
|
||||
options.ReplayBufferCapacity,
|
||||
"MxGateway:Events:ReplayBufferCapacity must be greater than or equal to zero.",
|
||||
builder);
|
||||
builder.RequireThat(
|
||||
options.ReplayRetentionSeconds >= 0,
|
||||
"MxGateway:Events:ReplayRetentionSeconds must be greater than or equal to zero.");
|
||||
}
|
||||
|
||||
private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder)
|
||||
|
||||
@@ -27,4 +27,11 @@ public sealed class SessionOptions
|
||||
/// Gets a value indicating whether multiple event subscribers are allowed per session.
|
||||
/// </summary>
|
||||
public bool AllowMultipleEventSubscribers { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the maximum number of concurrent event subscribers per session.
|
||||
/// Applies when <see cref="AllowMultipleEventSubscribers"/> is <see langword="true"/>;
|
||||
/// effectively 1 when it is <see langword="false"/>. Must be greater than zero.
|
||||
/// </summary>
|
||||
public int MaxEventSubscribersPerSession { get; init; } = 8;
|
||||
}
|
||||
|
||||
@@ -138,6 +138,7 @@ public sealed class DashboardLiveDataService : IDashboardLiveDataService, IAsync
|
||||
GatewaySession session = await _sessionManager.OpenSessionAsync(
|
||||
new SessionOpenRequest(BackendName, ClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
|
||||
ClientName,
|
||||
ownerKeyId: null,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
|
||||
@@ -24,9 +24,21 @@ public sealed class DashboardEventBroadcaster(
|
||||
return;
|
||||
}
|
||||
|
||||
Task send = hubContext.Clients
|
||||
.Group(EventsHub.GroupName(sessionId))
|
||||
.SendAsync(EventsHub.EventMessage, mxEvent);
|
||||
// Wrap the Task acquisition in a try/catch so a hypothetical synchronous throw
|
||||
// from SendAsync (e.g. an implementation that throws before returning the Task)
|
||||
// cannot escape Publish. The interface contract is never-throw; fire-and-forget.
|
||||
Task send;
|
||||
try
|
||||
{
|
||||
send = hubContext.Clients
|
||||
.Group(EventsHub.GroupName(sessionId))
|
||||
.SendAsync(EventsHub.EventMessage, mxEvent);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} threw synchronously.", sessionId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!send.IsCompletedSuccessfully)
|
||||
{
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
@@ -13,14 +11,46 @@ namespace ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
public sealed class EventStreamService(
|
||||
ISessionManager sessionManager,
|
||||
IOptions<GatewayOptions> options,
|
||||
MxAccessGrpcMapper mapper,
|
||||
GatewayMetrics metrics,
|
||||
IDashboardEventBroadcaster dashboardEventBroadcaster,
|
||||
ILogger<EventStreamService> logger) : IEventStreamService
|
||||
GatewayMetrics metrics) : IEventStreamService
|
||||
{
|
||||
/// <summary>
|
||||
/// Streams events from a session to the client asynchronously.
|
||||
/// Streams events from a session to the client asynchronously.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Task 4 rewired this from a per-RPC channel that drained the session directly
|
||||
/// to reading the subscriber's lease channel fed by the session's single
|
||||
/// <see cref="SessionEventDistributor"/> pump. The pump owns the single drain of
|
||||
/// the worker event stream and the worker→public mapping (mirroring the former
|
||||
/// <c>ProduceEventsAsync</c>); this loop is the per-subscriber boundary that
|
||||
/// applies the per-RPC filter (<c>AfterWorkerSequence</c>), queue-depth metrics,
|
||||
/// and the backpressure/overflow policy.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Task 6 moved the dashboard mirror OFF this per-RPC loop. The dashboard is now a
|
||||
/// first-class internal subscriber on the session's
|
||||
/// <see cref="SessionEventDistributor"/> (see <c>GatewaySession.StartDashboardMirror</c>),
|
||||
/// so it receives session events even when no gRPC client is streaming. This loop no
|
||||
/// longer mirrors to the dashboard. One deliberate consequence: the dashboard now sees
|
||||
/// RAW session events, not the per-gRPC-subscriber <c>AfterWorkerSequence</c>-filtered
|
||||
/// view this loop applies — the dashboard is a separate LDAP-authenticated monitoring
|
||||
/// view that should see the session's full event activity (per-session dashboard ACL is
|
||||
/// the separate Task 18).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Overflow handling (Task 5): the distributor's per-subscriber channel is bounded
|
||||
/// and the pump writes non-blocking. When this subscriber's channel is full the pump
|
||||
/// applies the per-subscriber backpressure policy and completes this subscriber's
|
||||
/// channel with a <see cref="SessionManagerException"/>
|
||||
/// (<see cref="SessionManagerErrorCode.EventQueueOverflow"/>). That terminal fault
|
||||
/// surfaces here when the reader's <c>MoveNextAsync</c> throws, and — like the
|
||||
/// pre-epic per-RPC overflow — it propagates to the gRPC client unchanged. The
|
||||
/// overflow metric, and (in the legacy single-subscriber FailFast case) the session
|
||||
/// fault + fault metric, are recorded by the distributor's overflow handler so the
|
||||
/// session, the pump, and other subscribers are isolated from this subscriber's
|
||||
/// slowness.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
/// <param name="request">Stream events request.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Async enumerable of MX events.</returns>
|
||||
@@ -35,151 +65,80 @@ public sealed class EventStreamService(
|
||||
$"Session {request.SessionId} was not found.");
|
||||
}
|
||||
|
||||
using IDisposable subscriber = session.AttachEventSubscriber(
|
||||
// No `using` here — subscriber.Dispose() is called exactly once in the finally
|
||||
// block below, which also disposes the reader. A `using` declaration would add a
|
||||
// second Dispose on the same path and double-decrement the session subscriber count.
|
||||
IEventSubscriberLease subscriber = session.AttachEventSubscriber(
|
||||
options.Value.Sessions.AllowMultipleEventSubscribers);
|
||||
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
|
||||
int streamQueueDepth = 0;
|
||||
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
|
||||
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
|
||||
{
|
||||
SingleReader = true,
|
||||
SingleWriter = true,
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
AllowSynchronousContinuations = false,
|
||||
});
|
||||
Task producerTask = ProduceEventsAsync(
|
||||
session,
|
||||
request.AfterWorkerSequence,
|
||||
eventQueue.Writer,
|
||||
() =>
|
||||
{
|
||||
Interlocked.Increment(ref streamQueueDepth);
|
||||
metrics.AdjustGrpcEventStreamQueueDepth(1);
|
||||
},
|
||||
streamCts.Token);
|
||||
ulong afterWorkerSequence = request.AfterWorkerSequence;
|
||||
IAsyncEnumerator<MxEvent> reader = subscriber.Reader
|
||||
.ReadAllAsync(cancellationToken)
|
||||
.GetAsyncEnumerator(cancellationToken);
|
||||
|
||||
try
|
||||
{
|
||||
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
||||
while (true)
|
||||
{
|
||||
Interlocked.Decrement(ref streamQueueDepth);
|
||||
metrics.AdjustGrpcEventStreamQueueDepth(-1);
|
||||
MxEvent mxEvent;
|
||||
try
|
||||
{
|
||||
if (!await reader.MoveNextAsync().ConfigureAwait(false))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
mxEvent = reader.Current;
|
||||
}
|
||||
catch (WorkerClientException workerException)
|
||||
{
|
||||
// The distributor pump completes every subscriber channel with the source
|
||||
// fault when the worker event stream terminates abnormally; that surfaces
|
||||
// here. Mirror the pre-Task-4 ProduceEventsAsync behavior: fault the
|
||||
// session and record the metric, then propagate the terminal fault to the
|
||||
// gRPC client.
|
||||
session.MarkFaulted(workerException.Message);
|
||||
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
|
||||
throw;
|
||||
}
|
||||
|
||||
// Per-RPC filter stays at the subscriber boundary: each request may resume
|
||||
// from a different AfterWorkerSequence, so the shared pump fans raw events and
|
||||
// this loop drops the ones at or below the caller's watermark.
|
||||
if (mxEvent.WorkerSequence <= afterWorkerSequence)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Queue-depth gauge tracks events the pump has fanned into this subscriber's
|
||||
// channel but the client has not yet consumed — the same "buffered, not yet
|
||||
// delivered" quantity the pre-Task-4 per-RPC channel reported. The bounded
|
||||
// subscriber channel supports counting, so reconcile the gauge to the current
|
||||
// backlog; falling back to a no-op delta if a channel ever cannot count.
|
||||
int backlog = subscriber.Reader.CanCount ? subscriber.Reader.Count : streamQueueDepth;
|
||||
int delta = backlog - streamQueueDepth;
|
||||
if (delta != 0)
|
||||
{
|
||||
streamQueueDepth = backlog;
|
||||
metrics.AdjustGrpcEventStreamQueueDepth(delta);
|
||||
}
|
||||
|
||||
yield return mxEvent;
|
||||
}
|
||||
|
||||
await producerTask.ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await streamCts.CancelAsync().ConfigureAwait(false);
|
||||
await reader.DisposeAsync().ConfigureAwait(false);
|
||||
subscriber.Dispose();
|
||||
|
||||
try
|
||||
if (streamQueueDepth != 0)
|
||||
{
|
||||
await producerTask.ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
|
||||
{
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
logger.LogDebug(
|
||||
exception,
|
||||
"Event stream producer stopped for session {SessionId}.",
|
||||
request.SessionId);
|
||||
}
|
||||
|
||||
int remainingDepth = Interlocked.Exchange(ref streamQueueDepth, 0);
|
||||
if (remainingDepth > 0)
|
||||
{
|
||||
metrics.AdjustGrpcEventStreamQueueDepth(-remainingDepth);
|
||||
metrics.AdjustGrpcEventStreamQueueDepth(-streamQueueDepth);
|
||||
streamQueueDepth = 0;
|
||||
}
|
||||
|
||||
metrics.StreamDisconnected("Detached");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProduceEventsAsync(
|
||||
GatewaySession session,
|
||||
ulong afterWorkerSequence,
|
||||
ChannelWriter<MxEvent> writer,
|
||||
Action eventQueued,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (WorkerEvent workerEvent in session
|
||||
.ReadEventsAsync(cancellationToken)
|
||||
.WithCancellation(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
MxEvent publicEvent = mapper.MapEvent(workerEvent);
|
||||
if (publicEvent.WorkerSequence <= afterWorkerSequence)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// Mirror the event to the dashboard EventsHub group for this
|
||||
// session. Fire-and-forget — broadcast errors must not affect
|
||||
// the source gRPC stream. Server-041: the
|
||||
// IDashboardEventBroadcaster contract documents Publish as
|
||||
// never-throw, but we enforce that at the seam too, so a
|
||||
// future implementation that adds synchronous validation or
|
||||
// a serializer hop cannot fault the producer loop and end
|
||||
// this client's gRPC stream.
|
||||
try
|
||||
{
|
||||
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogDebug(
|
||||
ex,
|
||||
"Dashboard event mirror threw for session {SessionId}; continuing.",
|
||||
session.SessionId);
|
||||
}
|
||||
|
||||
if (!writer.TryWrite(publicEvent))
|
||||
{
|
||||
string message = $"Session {session.SessionId} event stream queue overflowed.";
|
||||
metrics.QueueOverflow("grpc-event-stream");
|
||||
if (options.Value.Events.BackpressurePolicy == EventBackpressurePolicy.FailFast)
|
||||
{
|
||||
session.MarkFaulted(message);
|
||||
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogDebug(
|
||||
"Disconnecting event stream for session {SessionId} after queue overflow.",
|
||||
session.SessionId);
|
||||
}
|
||||
|
||||
writer.TryComplete(new SessionManagerException(
|
||||
SessionManagerErrorCode.EventQueueOverflow,
|
||||
message));
|
||||
return;
|
||||
}
|
||||
|
||||
eventQueued();
|
||||
}
|
||||
|
||||
writer.TryComplete();
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
writer.TryComplete();
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
if (exception is WorkerClientException)
|
||||
{
|
||||
session.MarkFaulted(exception.Message);
|
||||
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
|
||||
}
|
||||
|
||||
writer.TryComplete(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ public sealed class MxAccessGatewayService(
|
||||
.OpenSessionAsync(
|
||||
SessionOpenRequest.FromContract(request),
|
||||
ResolveClientIdentity(),
|
||||
identityAccessor.Current?.KeyId,
|
||||
context.CancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
|
||||
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
@@ -7,6 +13,7 @@ public sealed class GatewaySession
|
||||
{
|
||||
private readonly object _syncRoot = new();
|
||||
private readonly SemaphoreSlim _closeLock = new(1, 1);
|
||||
private readonly SessionEventStreaming _eventStreaming;
|
||||
private IWorkerClient? _workerClient;
|
||||
private SessionState _state = SessionState.Creating;
|
||||
private string? _finalFault;
|
||||
@@ -14,6 +21,12 @@ public sealed class GatewaySession
|
||||
private DateTimeOffset? _leaseExpiresAt;
|
||||
private bool _closeStarted;
|
||||
private int _activeEventSubscriberCount;
|
||||
private SessionEventDistributor? _eventDistributor;
|
||||
private bool _eventDistributorStarted;
|
||||
private bool _dashboardMirrorStarted;
|
||||
private IEventSubscriberLease? _dashboardMirrorLease;
|
||||
private Task? _dashboardMirrorTask;
|
||||
private CancellationTokenSource? _dashboardMirrorCts;
|
||||
private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = [];
|
||||
|
||||
/// <summary>
|
||||
@@ -30,6 +43,11 @@ public sealed class GatewaySession
|
||||
/// <param name="startupTimeout">Timeout for worker process startup.</param>
|
||||
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
|
||||
/// <param name="openedAt">Timestamp when the session opened.</param>
|
||||
/// <remarks>
|
||||
/// Constructs a session with no owner key (<see cref="OwnerKeyId"/> will be null).
|
||||
/// Authenticated call sites that have a resolved API key identity must use the
|
||||
/// 12-parameter overload and pass the caller's key id explicitly.
|
||||
/// </remarks>
|
||||
public GatewaySession(
|
||||
string sessionId,
|
||||
string backendName,
|
||||
@@ -48,6 +66,7 @@ public sealed class GatewaySession
|
||||
pipeName,
|
||||
nonce,
|
||||
clientIdentity,
|
||||
ownerKeyId: null,
|
||||
clientSessionName,
|
||||
clientCorrelationId,
|
||||
commandTimeout,
|
||||
@@ -66,6 +85,7 @@ public sealed class GatewaySession
|
||||
/// <param name="pipeName">Name of the named pipe for gateway-worker IPC.</param>
|
||||
/// <param name="nonce">Security nonce for worker validation.</param>
|
||||
/// <param name="clientIdentity">Client identity from the authentication context.</param>
|
||||
/// <param name="ownerKeyId">API key identifier of the caller that created this session.</param>
|
||||
/// <param name="clientSessionName">Client-supplied session name.</param>
|
||||
/// <param name="clientCorrelationId">Client-supplied correlation identifier.</param>
|
||||
/// <param name="commandTimeout">Timeout for command invocation.</param>
|
||||
@@ -73,19 +93,30 @@ public sealed class GatewaySession
|
||||
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
|
||||
/// <param name="leaseDuration">Duration of the session lease.</param>
|
||||
/// <param name="openedAt">Timestamp when the session opened.</param>
|
||||
/// <param name="eventStreaming">
|
||||
/// Dependencies the session uses to construct and own its
|
||||
/// <see cref="SessionEventDistributor"/> (the single per-session worker-event pump
|
||||
/// that fans raw mapped <see cref="MxEvent"/>s to every subscriber lease). When
|
||||
/// <see langword="null"/>, defaults are used (no replay logger, system clock, a
|
||||
/// fresh mapper, and default <see cref="EventOptions"/>) so unit tests that build a
|
||||
/// session directly still get a working distributor. Production passes the
|
||||
/// DI-resolved dependencies.
|
||||
/// </param>
|
||||
public GatewaySession(
|
||||
string sessionId,
|
||||
string backendName,
|
||||
string pipeName,
|
||||
string nonce,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
string? clientSessionName,
|
||||
string? clientCorrelationId,
|
||||
TimeSpan commandTimeout,
|
||||
TimeSpan startupTimeout,
|
||||
TimeSpan shutdownTimeout,
|
||||
TimeSpan leaseDuration,
|
||||
DateTimeOffset openedAt)
|
||||
DateTimeOffset openedAt,
|
||||
SessionEventStreaming? eventStreaming = null)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(sessionId))
|
||||
{
|
||||
@@ -112,6 +143,7 @@ public sealed class GatewaySession
|
||||
PipeName = pipeName;
|
||||
Nonce = nonce;
|
||||
ClientIdentity = clientIdentity;
|
||||
OwnerKeyId = ownerKeyId;
|
||||
ClientSessionName = clientSessionName;
|
||||
ClientCorrelationId = clientCorrelationId;
|
||||
CommandTimeout = commandTimeout;
|
||||
@@ -121,6 +153,7 @@ public sealed class GatewaySession
|
||||
OpenedAt = openedAt;
|
||||
_lastClientActivityAt = openedAt;
|
||||
_leaseExpiresAt = openedAt + leaseDuration;
|
||||
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -148,6 +181,11 @@ public sealed class GatewaySession
|
||||
/// </summary>
|
||||
public string? ClientIdentity { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the API key identifier of the caller that created this session.
|
||||
/// </summary>
|
||||
public string? OwnerKeyId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the client-supplied session name.
|
||||
/// </summary>
|
||||
@@ -318,9 +356,268 @@ public sealed class GatewaySession
|
||||
/// <summary>
|
||||
/// Transitions the session to the Ready state.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// On becoming Ready the session starts its internal dashboard mirror (Task 6) when a
|
||||
/// dashboard broadcaster was supplied. The mirror registers an internal subscriber on
|
||||
/// the distributor and starts the pump <em>before</em> any gRPC client attaches, so the
|
||||
/// dashboard EventsHub receives session events even with no gRPC subscriber streaming —
|
||||
/// fixing the "dark feed" where the dashboard only saw events while a gRPC client was
|
||||
/// actively streaming. Registering the internal subscriber BEFORE
|
||||
/// <see cref="SessionEventDistributor.StartAsync"/> also avoids the Task 4 hazard where
|
||||
/// starting the pump at Ready with zero subscribers drained a fast-completing worker
|
||||
/// stream into nothing and left a later subscriber hanging: there is now always a
|
||||
/// subscriber (the dashboard one) registered before the pump starts.
|
||||
/// </remarks>
|
||||
public void MarkReady()
|
||||
{
|
||||
TransitionTo(SessionState.Ready);
|
||||
StartDashboardMirror();
|
||||
}
|
||||
|
||||
// Constructs and starts the distributor exactly once, registering the subscriber under
|
||||
// the same start so no event the pump fans can be missed between start and register.
|
||||
// Started lazily on the FIRST AttachEventSubscriber rather than at MarkReady: today the
|
||||
// worker event stream is only drained when a client begins streaming, so deferring the
|
||||
// single drain to first-attach preserves that "events start flowing on subscribe"
|
||||
// behavior and avoids draining a fast-completing source into the void before any
|
||||
// subscriber exists. The source factory mirrors the mapping/ordering/start that
|
||||
// EventStreamService.ProduceEventsAsync used before Task 4: it drains the worker event
|
||||
// stream in source order and maps each WorkerEvent to the public MxEvent with the same
|
||||
// mapper, with no skip/filter — per-RPC filtering (e.g. AfterWorkerSequence) stays at the
|
||||
// subscriber boundary in EventStreamService. Returns a registered lease atomically with
|
||||
// the start so the very first subscriber sees the stream from its beginning.
|
||||
private IEventSubscriberLease StartDistributorAndRegister()
|
||||
{
|
||||
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
|
||||
|
||||
// Register BEFORE starting the pump so a subscriber is present when the pump begins
|
||||
// draining — no event is fanned to an empty subscriber set and then missed by this
|
||||
// first subscriber. StartAsync only schedules the pump task; it never blocks.
|
||||
IEventSubscriberLease lease = distributor.Register();
|
||||
StartPumpIfRequested(distributor, startNow);
|
||||
|
||||
return lease;
|
||||
}
|
||||
|
||||
// Constructs the distributor exactly once and reports whether THIS caller is the one
|
||||
// that should start the pump (i.e. it observed the unstarted state and claimed the
|
||||
// start). Both the construction and the started-flag flip happen under _syncRoot so two
|
||||
// concurrent callers (e.g. MarkReady's dashboard mirror and a racing first
|
||||
// AttachEventSubscriber) agree on a single distributor and a single start.
|
||||
private SessionEventDistributor EnsureDistributorCreated(out bool startNow)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_eventDistributor is null)
|
||||
{
|
||||
EventOptions eventOptions = _eventStreaming.EventOptions;
|
||||
_eventDistributor = new SessionEventDistributor(
|
||||
SessionId,
|
||||
MapWorkerEventsAsync,
|
||||
eventOptions.QueueCapacity,
|
||||
eventOptions.ReplayBufferCapacity,
|
||||
eventOptions.ReplayRetentionSeconds,
|
||||
_eventStreaming.DistributorLogger,
|
||||
_eventStreaming.TimeProvider,
|
||||
CreateOverflowHandler(eventOptions.BackpressurePolicy));
|
||||
}
|
||||
|
||||
startNow = false;
|
||||
if (!_eventDistributorStarted)
|
||||
{
|
||||
_eventDistributorStarted = true;
|
||||
startNow = true;
|
||||
}
|
||||
|
||||
return _eventDistributor;
|
||||
}
|
||||
}
|
||||
|
||||
private static void StartPumpIfRequested(SessionEventDistributor distributor, bool startNow)
|
||||
{
|
||||
if (!startNow)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// StartAsync only schedules the pump via Task.Run and returns a completed task;
|
||||
// it does not perform any async I/O itself. The sync-over-async call here is
|
||||
// therefore safe and will not deadlock. Do not make StartAsync truly async
|
||||
// (i.e., await real I/O before returning) without also changing this call site.
|
||||
distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
// Registers the gateway-owned internal dashboard subscriber on the distributor and starts
|
||||
// a background loop that mirrors every fanned event to the dashboard broadcaster. Called
|
||||
// once when the session becomes Ready (idempotent). The internal subscriber is registered
|
||||
// BEFORE the pump starts (see StartDistributorAndRegister / EnsureDistributorCreated), so
|
||||
// a subscriber is always present at pump start — the dashboard receives events with no
|
||||
// gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang
|
||||
// cannot occur. No-op when no dashboard broadcaster was supplied (unit tests).
|
||||
//
|
||||
// Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published
|
||||
// atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under
|
||||
// that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot
|
||||
// to avoid lock inversion with the distributor's own lifecycle lock), we re-enter
|
||||
// _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/
|
||||
// Faulted at that point, we dispose the just-created lease immediately and do NOT start
|
||||
// the mirror task, so nothing is orphaned.
|
||||
private void StartDashboardMirror()
|
||||
{
|
||||
IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster;
|
||||
if (broadcaster is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
CancellationToken loopToken;
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_dashboardMirrorStarted || _state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_dashboardMirrorStarted = true;
|
||||
_dashboardMirrorCts = new CancellationTokenSource();
|
||||
loopToken = _dashboardMirrorCts.Token;
|
||||
}
|
||||
|
||||
// Create the distributor (claiming the start if we are first) and register the
|
||||
// internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard
|
||||
// subscriber out of the single-subscriber overflow accounting, so a slow/broken
|
||||
// dashboard mirror only disconnects itself and never faults the session.
|
||||
// These three calls are OUTSIDE _syncRoot to avoid holding it across
|
||||
// EnsureDistributorCreated's own lock and StartAsync's Task.Run.
|
||||
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
|
||||
IEventSubscriberLease lease = distributor.Register(isInternal: true);
|
||||
StartPumpIfRequested(distributor, startNow);
|
||||
|
||||
// Publish BOTH the lease and the task atomically under one lock section so
|
||||
// DisposeAsync always sees them in a consistent state: either both are set or
|
||||
// both are null. If the session already started disposal before we got here,
|
||||
// dispose the lease immediately instead of orphaning it.
|
||||
lock (_syncRoot)
|
||||
{
|
||||
if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
|
||||
{
|
||||
// Disposal already ran (or is in progress) — discard the just-created
|
||||
// lease now so it is not orphaned. Do NOT launch the mirror task.
|
||||
lease.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
_dashboardMirrorLease = lease;
|
||||
_dashboardMirrorTask = Task.Run(
|
||||
() => RunDashboardMirrorAsync(broadcaster, lease, loopToken),
|
||||
CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
// Reads the internal dashboard subscriber's channel and publishes each RAW fanned event
|
||||
// to the dashboard broadcaster. The dashboard is a first-class distributor subscriber
|
||||
// (Task 6), so it sees the session's full raw event activity — NOT the per-gRPC-subscriber
|
||||
// AfterWorkerSequence filtering that EventStreamService applies at its own boundary. This
|
||||
// is intentional: the dashboard is a separate LDAP-authenticated monitoring view (per-
|
||||
// session dashboard ACL is the separate Task 18). Publish is best-effort / never-throw, so
|
||||
// a slow or broken dashboard cannot fault the session or stall the pump; the bounded
|
||||
// internal subscriber channel (Task 5 per-subscriber isolation) only disconnects THIS
|
||||
// mirror on overflow, leaving the session and other subscribers untouched.
|
||||
private async Task RunDashboardMirrorAsync(
|
||||
IDashboardEventBroadcaster broadcaster,
|
||||
IEventSubscriberLease lease,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (MxEvent mxEvent in lease.Reader
|
||||
.ReadAllAsync(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
try
|
||||
{
|
||||
broadcaster.Publish(SessionId, mxEvent);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// Publish is documented never-throw, but enforce it here too so a future
|
||||
// implementation cannot fault the mirror loop. Logs identifiers only.
|
||||
_eventStreaming.DistributorLogger.LogDebug(
|
||||
exception,
|
||||
"Dashboard event mirror threw for session {SessionId}; continuing.",
|
||||
SessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Teardown path: the session is shutting down the mirror.
|
||||
}
|
||||
catch (SessionManagerException)
|
||||
{
|
||||
// The internal subscriber's channel overflowed and the distributor disconnected
|
||||
// it with a terminal overflow fault. That disconnects only the dashboard mirror;
|
||||
// the session, pump, and any gRPC subscriber are unaffected. Stop mirroring.
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// Source-fault completion (worker event stream terminated abnormally) surfaces
|
||||
// here. The session's own fault handling runs via the gRPC path / lifecycle; the
|
||||
// mirror just stops. Logs identifiers only.
|
||||
_eventStreaming.DistributorLogger.LogDebug(
|
||||
exception,
|
||||
"Dashboard event mirror loop ended for session {SessionId}.",
|
||||
SessionId);
|
||||
}
|
||||
}
|
||||
|
||||
// Builds the per-subscriber backpressure handler the distributor invokes when a
|
||||
// subscriber's bounded channel overflows. The distributor always disconnects the
|
||||
// offending subscriber with an EventQueueOverflow fault; this handler adds the
|
||||
// observable side effects, preserving exactly what the pre-epic per-RPC overflow path
|
||||
// emitted:
|
||||
// - always record the queue-overflow metric, labeled by subscriber kind;
|
||||
// - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole
|
||||
// session and record the fault metric, matching back-compat behavior;
|
||||
// - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT
|
||||
// fault the session — the distributor's disconnect of the one slow subscriber is the
|
||||
// whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber
|
||||
// FailFast deliberately degrades to a disconnect because faulting a shared session on
|
||||
// one slow consumer would punish healthy subscribers.
|
||||
// The delegate now carries isInternal directly (Issue 4), so the metric label is chosen
|
||||
// without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external.
|
||||
private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy)
|
||||
{
|
||||
GatewayMetrics metrics = _eventStreaming.Metrics;
|
||||
string sessionId = SessionId;
|
||||
return (isOnlySubscriber, isInternal) =>
|
||||
{
|
||||
// Label the overflow metric by subscriber kind. The distributor passes isInternal
|
||||
// directly, so no heuristic is needed to distinguish an internal overflow (the
|
||||
// gateway-owned dashboard mirror) from an external one (a gRPC streaming client).
|
||||
string label = isInternal ? "dashboard-mirror" : "grpc-event-stream";
|
||||
metrics.QueueOverflow(label);
|
||||
|
||||
if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber)
|
||||
{
|
||||
MarkFaulted($"Session {sessionId} event stream queue overflowed.");
|
||||
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// The distributor's single event source. Drains the worker event stream once (the
|
||||
// distributor guarantees a single consumer) and maps each frame to the public MxEvent,
|
||||
// preserving worker order. Mirrors the former ProduceEventsAsync mapping exactly.
|
||||
private async IAsyncEnumerable<MxEvent> MapWorkerEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
MxAccessGrpcMapper mapper = _eventStreaming.Mapper;
|
||||
await foreach (WorkerEvent workerEvent in ReadEventsAsync(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return mapper.MapEvent(workerEvent);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -381,10 +678,15 @@ public sealed class GatewaySession
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Attaches an event subscriber and returns a disposable lease.
|
||||
/// Attaches an event subscriber and returns a lease whose
|
||||
/// <see cref="IEventSubscriberLease.Reader"/> reads the fanned public
|
||||
/// <see cref="MxEvent"/>s for this subscriber. The single-subscriber guard
|
||||
/// (Tasks 7/8 relax it) is unchanged: with multi-subscriber disabled a second
|
||||
/// attach is rejected. The returned lease, when disposed, unregisters the
|
||||
/// distributor subscriber AND decrements the active-subscriber count.
|
||||
/// </summary>
|
||||
/// <param name="allowMultipleSubscribers">If true, allows multiple concurrent event subscribers.</param>
|
||||
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
|
||||
public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers)
|
||||
{
|
||||
lock (_syncRoot)
|
||||
{
|
||||
@@ -403,7 +705,20 @@ public sealed class GatewaySession
|
||||
}
|
||||
|
||||
_activeEventSubscriberCount++;
|
||||
return new EventSubscriberLease(this);
|
||||
}
|
||||
|
||||
// Construct/start the distributor and register this subscriber. Done outside the
|
||||
// guard lock (StartDistributorAndRegister takes _syncRoot itself for construction).
|
||||
// On any failure roll back the count we just took so the guard stays consistent.
|
||||
try
|
||||
{
|
||||
IEventSubscriberLease distributorLease = StartDistributorAndRegister();
|
||||
return new EventSubscriberLease(this, distributorLease);
|
||||
}
|
||||
catch
|
||||
{
|
||||
DetachEventSubscriber();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -960,6 +1275,63 @@ public sealed class GatewaySession
|
||||
{
|
||||
}
|
||||
|
||||
// Stop the internal dashboard mirror first: cancel its loop, dispose its lease (which
|
||||
// unregisters its internal distributor subscriber and completes its channel), and
|
||||
// await the loop task. Done BEFORE disposing the distributor and worker client — like
|
||||
// the distributor itself — so the mirror is no longer reading the pump when the pump
|
||||
// and its source (the worker client) tear down.
|
||||
IEventSubscriberLease? dashboardLease;
|
||||
Task? dashboardTask;
|
||||
CancellationTokenSource? dashboardCts;
|
||||
lock (_syncRoot)
|
||||
{
|
||||
dashboardLease = _dashboardMirrorLease;
|
||||
dashboardTask = _dashboardMirrorTask;
|
||||
dashboardCts = _dashboardMirrorCts;
|
||||
_dashboardMirrorLease = null;
|
||||
_dashboardMirrorTask = null;
|
||||
_dashboardMirrorCts = null;
|
||||
}
|
||||
|
||||
if (dashboardCts is not null)
|
||||
{
|
||||
await dashboardCts.CancelAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
dashboardLease?.Dispose();
|
||||
|
||||
if (dashboardTask is not null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await dashboardTask.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// The mirror loop swallows its own faults; any escape here must not block
|
||||
// disposal. The loop has stopped, which is all teardown requires.
|
||||
}
|
||||
}
|
||||
|
||||
dashboardCts?.Dispose();
|
||||
|
||||
// Stop the event pump and complete every subscriber channel before tearing down the
|
||||
// worker client (the pump's source). DisposeAsync is the single session teardown
|
||||
// point (SessionManager.RemoveSessionAsync awaits it after close), so awaiting it
|
||||
// here guarantees the distributor's pump task is observed and subscribers are
|
||||
// completed rather than left dangling.
|
||||
SessionEventDistributor? distributor;
|
||||
lock (_syncRoot)
|
||||
{
|
||||
distributor = _eventDistributor;
|
||||
_eventDistributor = null;
|
||||
}
|
||||
|
||||
if (distributor is not null)
|
||||
{
|
||||
await distributor.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (_workerClient is not null)
|
||||
{
|
||||
await _workerClient.DisposeAsync().ConfigureAwait(false);
|
||||
@@ -1101,22 +1473,30 @@ public sealed class GatewaySession
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
|
||||
private sealed class EventSubscriberLease(GatewaySession session, IEventSubscriberLease distributorLease)
|
||||
: IEventSubscriberLease
|
||||
{
|
||||
private bool _disposed;
|
||||
// 0 = live, 1 = disposed. Interlocked so concurrent stream-completion +
|
||||
// client-cancellation paths cannot both call DetachEventSubscriber and
|
||||
// double-decrement _activeEventSubscriberCount to -1.
|
||||
private int _leaseDisposed;
|
||||
|
||||
/// <inheritdoc />
|
||||
public System.Threading.Channels.ChannelReader<MxEvent> Reader => distributorLease.Reader;
|
||||
|
||||
/// <summary>
|
||||
/// Disposes the lease and detaches the event subscriber.
|
||||
/// Disposes the lease: unregisters this subscriber from the distributor (completing
|
||||
/// its channel) and decrements the session's active-subscriber count. Ordering is
|
||||
/// not significant — the count guard and the distributor registration are
|
||||
/// independent — but both must run exactly once.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
|
||||
{
|
||||
return;
|
||||
distributorLease.Dispose();
|
||||
session.DetachEventSubscriber();
|
||||
}
|
||||
|
||||
session.DetachEventSubscriber();
|
||||
_disposed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
using System.Threading.Channels;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// A registration lease into a <see cref="SessionEventDistributor"/>. Exposes the
|
||||
/// subscriber's own <see cref="ChannelReader{T}"/> of fanned events. Disposing the
|
||||
/// lease unregisters the subscriber and completes its channel without disturbing the
|
||||
/// pump or other subscribers.
|
||||
/// </summary>
|
||||
public interface IEventSubscriberLease : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the reader for this subscriber's fanned event channel.
|
||||
/// </summary>
|
||||
ChannelReader<MxEvent> Reader { get; }
|
||||
}
|
||||
@@ -8,11 +8,13 @@ public interface ISessionManager
|
||||
/// <summary>Opens a new gateway session and launches a worker process.</summary>
|
||||
/// <param name="request">Request payload.</param>
|
||||
/// <param name="clientIdentity">Client identity string.</param>
|
||||
/// <param name="ownerKeyId">API key identifier of the caller creating the session.</param>
|
||||
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
|
||||
/// <returns>The newly opened session.</returns>
|
||||
Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Attempts to retrieve a session by ID.</summary>
|
||||
|
||||
@@ -0,0 +1,666 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Threading.Channels;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
|
||||
/// and the event cannot be written. The handler applies policy side-effects only:
|
||||
/// it records the overflow metric and, in the legacy single-subscriber FailFast case,
|
||||
/// faults the owning session. The handler MUST NOT complete the subscriber's channel —
|
||||
/// the distributor performs the disconnect and channel-completion unconditionally,
|
||||
/// regardless of what the handler does.
|
||||
/// </summary>
|
||||
/// <param name="isOnlySubscriber">
|
||||
/// <see langword="true"/> when the overflowing subscriber is the sole registered
|
||||
/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
|
||||
/// the session only in this case; with multiple subscribers FailFast degrades to a
|
||||
/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
|
||||
/// Always <see langword="false"/> for internal subscribers (the dashboard mirror) because
|
||||
/// <see cref="SessionEventDistributor"/> excludes them from the external-subscriber count.
|
||||
/// </param>
|
||||
/// <param name="isInternal">
|
||||
/// <see langword="true"/> when the overflowing subscriber is the gateway-owned internal
|
||||
/// dashboard mirror subscriber. The handler uses this to choose the correct metric label
|
||||
/// (<c>"dashboard-mirror"</c> vs <c>"grpc-event-stream"</c>).
|
||||
/// </param>
|
||||
public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal);
|
||||
|
||||
/// <summary>
|
||||
/// Per-session event pump and fan-out. A single background task drains the
|
||||
/// session's event source <em>exactly once</em> and fans each event out to
|
||||
/// every currently-registered subscriber's own bounded channel.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Introduced by Task 2 of the Session Resilience epic; the bounded replay ring
|
||||
/// buffer was added by Task 3, it was wired into <c>GatewaySession</c> and
|
||||
/// <c>EventStreamService</c> by Task 4, and the per-subscriber backpressure-isolation
|
||||
/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
|
||||
/// bounded channel and the pump applies the policy to that subscriber alone (see
|
||||
/// <see cref="SubscriberOverflowHandler"/> and <c>OnSubscriberOverflow</c>), leaving
|
||||
/// the pump, the session, and other subscribers running. The class does not yet
|
||||
/// remove the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
|
||||
/// eviction (oldest entry dropped when the count exceeds
|
||||
/// <c>replayBufferCapacity</c>) and age eviction (entries older than
|
||||
/// <c>replayRetentionSeconds</c> dropped on the next append or query), and is
|
||||
/// queried via <see cref="TryGetReplayFrom"/> by reconnecting subscribers.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Source seam.</b> The event source is injected as a
|
||||
/// <see cref="Func{T, TResult}"/> producing an
|
||||
/// <see cref="IAsyncEnumerable{T}"/> of already-mapped public
|
||||
/// <see cref="MxEvent"/>s, given a <see cref="CancellationToken"/>. This is the
|
||||
/// cleanest seam for Task 4: it can pass
|
||||
/// <c>ct => session.ReadEventsAsync(ct).Select(mapper.MapEvent)</c> (or a
|
||||
/// channel reader's <c>ReadAllAsync</c>), while unit tests pass a plain
|
||||
/// channel reader's <c>ReadAllAsync</c> with no real session. The pump owns the
|
||||
/// single consumption of this enumerable; fan-out happens on the public
|
||||
/// <see cref="MxEvent"/> after mapping, mirroring today's
|
||||
/// <c>EventStreamService.ProduceEventsAsync</c> ordering.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Concurrency.</b> The subscriber set is a
|
||||
/// <see cref="ConcurrentDictionary{TKey, TValue}"/> keyed by a monotonic id.
|
||||
/// The pump iterates it with a snapshot-free enumerator (which never throws on
|
||||
/// concurrent add/remove), and <see cref="Register"/> / lease disposal mutate it
|
||||
/// without any lock held across an <c>await</c>. Each subscriber channel has a
|
||||
/// single writer — the pump — so per-channel writes never race. MXAccess parity:
|
||||
/// events are fanned in the order received; the pump never reorders or
|
||||
/// synthesizes events.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SessionEventDistributor : IAsyncDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// Bounded wait for the pump to stop during disposal. A source factory that
|
||||
/// ignores cancellation must not hang dispose forever; after this window the
|
||||
/// pump is abandoned and subscribers are completed anyway.
|
||||
/// </summary>
|
||||
private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
private readonly string _sessionId;
|
||||
private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory;
|
||||
private readonly int _subscriberQueueCapacity;
|
||||
private readonly SubscriberOverflowHandler? _overflowHandler;
|
||||
private readonly TimeSpan _shutdownTimeout;
|
||||
private readonly ILogger<SessionEventDistributor> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
|
||||
private readonly CancellationTokenSource _shutdownCts = new();
|
||||
private readonly object _lifecycleLock = new();
|
||||
|
||||
// Replay ring buffer. Appended on the pump thread and queried from arbitrary
|
||||
// threads via TryGetReplayFrom, so every access is under _replayLock. The deque
|
||||
// keeps events in ascending WorkerSequence order (the pump fans in source order),
|
||||
// so the oldest retained event is always at the front. Capacity == 0 disables
|
||||
// retention; RetentionSeconds <= 0 disables age-based eviction.
|
||||
private readonly int _replayBufferCapacity;
|
||||
private readonly TimeSpan _replayRetention;
|
||||
private readonly bool _ageEvictionEnabled;
|
||||
private readonly LinkedList<ReplayEntry> _replayBuffer = new();
|
||||
private readonly object _replayLock = new();
|
||||
private bool _anyEventSeen;
|
||||
private ulong _highestSequenceSeen;
|
||||
|
||||
private long _nextSubscriberId;
|
||||
private Task? _pumpTask;
|
||||
private bool _started;
|
||||
private bool _disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a per-session event distributor.
|
||||
/// </summary>
|
||||
/// <param name="sessionId">Owning session id, used only for logging context.</param>
|
||||
/// <param name="eventSourceFactory">
|
||||
/// Factory producing the session's event stream given a cancellation token.
|
||||
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
|
||||
/// plugs into.
|
||||
/// </param>
|
||||
/// <param name="subscriberQueueCapacity">
|
||||
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
|
||||
/// queue capacity shape used today.
|
||||
/// </param>
|
||||
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
|
||||
/// <remarks>
|
||||
/// This overload disables the replay ring buffer (capacity 0). Use the overload
|
||||
/// taking replay parameters to retain events for reconnect/reattach replay.
|
||||
/// Kept <c>internal</c> so production wiring (Task 4) cannot accidentally use
|
||||
/// the no-replay path; tests reach it via <c>InternalsVisibleTo</c>.
|
||||
/// </remarks>
|
||||
internal SessionEventDistributor(
|
||||
string sessionId,
|
||||
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
|
||||
int subscriberQueueCapacity,
|
||||
ILogger<SessionEventDistributor> logger,
|
||||
SubscriberOverflowHandler? overflowHandler = null)
|
||||
: this(
|
||||
sessionId,
|
||||
eventSourceFactory,
|
||||
subscriberQueueCapacity,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0,
|
||||
logger,
|
||||
TimeProvider.System,
|
||||
overflowHandler)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a per-session event distributor with a bounded replay ring buffer.
|
||||
/// </summary>
|
||||
/// <param name="sessionId">Owning session id, used only for logging context.</param>
|
||||
/// <param name="eventSourceFactory">
|
||||
/// Factory producing the session's event stream given a cancellation token.
|
||||
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
|
||||
/// plugs into.
|
||||
/// </param>
|
||||
/// <param name="subscriberQueueCapacity">
|
||||
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
|
||||
/// queue capacity shape used today.
|
||||
/// </param>
|
||||
/// <param name="replayBufferCapacity">
|
||||
/// Maximum number of events retained for replay. The oldest retained event is
|
||||
/// evicted once this count is exceeded. <c>0</c> disables retention entirely.
|
||||
/// </param>
|
||||
/// <param name="replayRetentionSeconds">
|
||||
/// Maximum age, in seconds, of a retained event. Entries older than this are
|
||||
/// evicted regardless of capacity. <c>0</c> (or less) disables age-based eviction.
|
||||
/// </param>
|
||||
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
|
||||
/// <param name="timeProvider">
|
||||
/// Clock used to timestamp and age-evict replay entries. Inject a fake to make
|
||||
/// age-eviction deterministic in tests.
|
||||
/// </param>
|
||||
/// <param name="overflowHandler">
|
||||
/// Optional per-subscriber backpressure handler invoked when a subscriber's bounded
|
||||
/// channel is full. It records the overflow metric and, for the legacy
|
||||
/// single-subscriber FailFast case, faults the owning session. The distributor always
|
||||
/// disconnects the offending subscriber with an overflow fault regardless of the
|
||||
/// handler. When <see langword="null"/> (unit/skeleton use) the offending subscriber is
|
||||
/// still disconnected but no metric/fault side effect runs.
|
||||
/// </param>
|
||||
public SessionEventDistributor(
|
||||
string sessionId,
|
||||
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
|
||||
int subscriberQueueCapacity,
|
||||
int replayBufferCapacity,
|
||||
double replayRetentionSeconds,
|
||||
ILogger<SessionEventDistributor> logger,
|
||||
TimeProvider timeProvider,
|
||||
SubscriberOverflowHandler? overflowHandler = null)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
|
||||
ArgumentNullException.ThrowIfNull(eventSourceFactory);
|
||||
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
|
||||
ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity);
|
||||
ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
ArgumentNullException.ThrowIfNull(timeProvider);
|
||||
|
||||
_sessionId = sessionId;
|
||||
_eventSourceFactory = eventSourceFactory;
|
||||
_subscriberQueueCapacity = subscriberQueueCapacity;
|
||||
_overflowHandler = overflowHandler;
|
||||
_shutdownTimeout = DefaultShutdownTimeout;
|
||||
_replayBufferCapacity = replayBufferCapacity;
|
||||
_ageEvictionEnabled = replayRetentionSeconds > 0;
|
||||
_replayRetention = _ageEvictionEnabled
|
||||
? TimeSpan.FromSeconds(replayRetentionSeconds)
|
||||
: TimeSpan.Zero;
|
||||
_logger = logger;
|
||||
_timeProvider = timeProvider;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the count of currently-registered subscribers.
|
||||
/// </summary>
|
||||
public int SubscriberCount => _subscribers.Count;
|
||||
|
||||
/// <summary>
|
||||
/// Starts the background pump. Idempotent — a second call is a no-op.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">Token observed only while starting.</param>
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
lock (_lifecycleLock)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
if (_started)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
_started = true;
|
||||
_pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers a new subscriber and returns its lease. The lease exposes the
|
||||
/// subscriber's <see cref="ChannelReader{T}"/> and, when disposed, unregisters the
|
||||
/// subscriber and completes its channel without disturbing the pump or other
|
||||
/// subscribers.
|
||||
/// </summary>
|
||||
/// <param name="isInternal">
|
||||
/// <see langword="true"/> for a gateway-owned internal subscriber (Task 6: the
|
||||
/// session's dashboard mirror) that must NOT participate in the single-subscriber
|
||||
/// overflow accounting. An internal subscriber is excluded from the
|
||||
/// <c>isOnlySubscriber</c> count, so a lone external gRPC subscriber still reports
|
||||
/// <c>isOnlySubscriber == true</c> (preserving legacy FailFast session-fault
|
||||
/// behavior) even while the dashboard subscriber is attached; and an internal
|
||||
/// subscriber that itself overflows always reports <c>isOnlySubscriber == false</c>,
|
||||
/// so a slow/broken dashboard can never fault the session — it is merely
|
||||
/// disconnected from the mirror. Defaults to <see langword="false"/> (external
|
||||
/// subscriber) so every existing call site is unchanged.
|
||||
/// </param>
|
||||
public IEventSubscriberLease Register(bool isInternal = false)
|
||||
{
|
||||
// The pump is the single writer for this channel; readers are single-consumer
|
||||
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
|
||||
// disabled so a slow reader can never stall the pump on its completion.
|
||||
//
|
||||
// The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one
|
||||
// slow reader can never stall the single pump that feeds every subscriber. FullMode
|
||||
// is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
|
||||
// WriteAsync overload), but because Wait is the only BoundedChannelFullMode under
|
||||
// which TryWrite returns false when the channel is full. That false return IS the
|
||||
// overflow signal the pump needs to apply the per-subscriber backpressure policy. The
|
||||
// Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and
|
||||
// re-introducing the silent data loss this task removes. So: Wait mode + TryWrite =
|
||||
// a non-blocking pump that still detects a full subscriber channel.
|
||||
Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>(
|
||||
new BoundedChannelOptions(_subscriberQueueCapacity)
|
||||
{
|
||||
SingleReader = true,
|
||||
SingleWriter = true,
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
AllowSynchronousContinuations = false,
|
||||
});
|
||||
|
||||
long id = Interlocked.Increment(ref _nextSubscriberId);
|
||||
Subscriber subscriber = new(id, channel, isInternal);
|
||||
|
||||
// The disposed check AND the map add happen under the same lock with no await
|
||||
// in between. DisposeAsync sets _disposed=true under this same lock before it
|
||||
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
|
||||
// can be added — closing the Register-after-DisposeAsync window that would
|
||||
// otherwise leave a subscriber's channel never completed.
|
||||
lock (_lifecycleLock)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
_subscribers[id] = subscriber;
|
||||
}
|
||||
|
||||
return new SubscriberLease(this, subscriber);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops the pump and completes all subscriber channels. Idempotent.
|
||||
/// </summary>
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
Task? pumpTask;
|
||||
lock (_lifecycleLock)
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
pumpTask = _pumpTask;
|
||||
}
|
||||
|
||||
// Signal the pump to stop. It must not block on a non-reading subscriber:
|
||||
// it writes with non-blocking TryWrite, so cancellation tears it down promptly.
|
||||
await _shutdownCts.CancelAsync().ConfigureAwait(false);
|
||||
|
||||
if (pumpTask is not null)
|
||||
{
|
||||
// Bound the wait: a source factory that ignores cancellation would otherwise
|
||||
// hang dispose forever. If the pump does not stop in time we log and proceed
|
||||
// to complete subscribers anyway; DisposeAsync must not throw on this path.
|
||||
Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false);
|
||||
if (!ReferenceEquals(completed, pumpTask))
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.",
|
||||
_shutdownTimeout.TotalSeconds,
|
||||
_sessionId);
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
await pumpTask.ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
exception,
|
||||
"Event distributor pump faulted during shutdown for session {SessionId}.",
|
||||
_sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CompleteAllSubscribers(error: null);
|
||||
_shutdownCts.Dispose();
|
||||
}
|
||||
|
||||
private async Task PumpAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken)
|
||||
.WithCancellation(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
// Retain for replay BEFORE fan-out so a reconnecting subscriber that
|
||||
// queries between fan-out and its own read still sees this event. Order
|
||||
// is preserved: the pump is the single appender and events arrive in
|
||||
// source order.
|
||||
AppendToReplayBuffer(mxEvent);
|
||||
|
||||
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
|
||||
// add/remove; a subscriber registered mid-iteration may miss this event,
|
||||
// which matches "late subscribers see events after they register".
|
||||
foreach (Subscriber subscriber in _subscribers.Values)
|
||||
{
|
||||
// Non-blocking write: TryWrite never blocks the pump on a slow reader.
|
||||
// A false return means this subscriber's bounded channel is full — the
|
||||
// per-subscriber overflow signal. We apply the backpressure policy to
|
||||
// THIS subscriber only; the pump, the session, and every other subscriber
|
||||
// keep running. Logs identifiers (worker sequence, subscriber id, session)
|
||||
// only, never the event payload or tag values.
|
||||
if (!subscriber.Channel.Writer.TryWrite(mxEvent))
|
||||
{
|
||||
OnSubscriberOverflow(subscriber, mxEvent.WorkerSequence);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CompleteAllSubscribers(error: null);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Shutdown path: DisposeAsync completes subscribers.
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// Unexpected source fault (not the shutdown-cancellation path above) — visible
|
||||
// by default so an event stream silently dying is not lost in Debug noise.
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"Event distributor source faulted for session {SessionId}.",
|
||||
_sessionId);
|
||||
CompleteAllSubscribers(exception);
|
||||
}
|
||||
}
|
||||
|
||||
// Applies the per-subscriber backpressure policy when a subscriber's bounded channel is
|
||||
// full. Runs on the pump thread. The offending subscriber is ALWAYS disconnected with an
|
||||
// overflow fault and unregistered, so it can never wedge the pump again; the overflow
|
||||
// handler decides the observable side effects (overflow metric, and — for legacy
|
||||
// single-subscriber FailFast — faulting the owning session). Multi-subscriber FailFast
|
||||
// intentionally degrades to a plain disconnect (see SubscriberOverflowHandler docs): one
|
||||
// slow consumer must not fault a session shared by other healthy subscribers.
|
||||
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
|
||||
{
|
||||
// Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
|
||||
// the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
|
||||
// when the overflowing subscriber is the sole subscriber.
|
||||
//
|
||||
// This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
|
||||
// by the validator and the single-subscriber guard in AttachEventSubscriber — a
|
||||
// concurrent second registration is impossible, so the false-FailFast race (two
|
||||
// subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
|
||||
// FailFast wrongly faults the session) cannot occur today.
|
||||
//
|
||||
// REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
|
||||
// race window opens — a concurrent second registration could cause Count to read as 1
|
||||
// here even with two subscribers, producing a false FailFast that faults a shared
|
||||
// session. Resolve before enabling multi-subscriber.
|
||||
//
|
||||
// Task 6: the gateway-owned internal dashboard subscriber is excluded from this
|
||||
// accounting. (a) An internal subscriber that overflows is NEVER the "only subscriber"
|
||||
// — a slow/broken dashboard must never fault the session, only disconnect its own
|
||||
// mirror. (b) Internal subscribers are excluded from the count, so a lone external
|
||||
// gRPC subscriber still reports isOnlySubscriber==true and preserves the legacy
|
||||
// FailFast session-fault behavior even while the dashboard mirror is attached.
|
||||
bool isOnlySubscriber = !subscriber.IsInternal && CountExternalSubscribers() == 1;
|
||||
|
||||
_logger.LogDebug(
|
||||
"Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
|
||||
subscriber.Id,
|
||||
_sessionId,
|
||||
workerSequence);
|
||||
|
||||
// Observability + session-fault decision. Errors here must not stall the pump or
|
||||
// leave the subscriber attached, so the disconnect below runs regardless.
|
||||
// Pass subscriber.IsInternal so the handler can choose the correct metric label.
|
||||
try
|
||||
{
|
||||
_overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogError(
|
||||
exception,
|
||||
"Event distributor overflow handler threw for session {SessionId}; disconnecting subscriber {SubscriberId} anyway.",
|
||||
_sessionId,
|
||||
subscriber.Id);
|
||||
}
|
||||
|
||||
// Disconnect ONLY this subscriber: complete its channel with the overflow fault and
|
||||
// remove it from the fan-out set. Its gRPC reader's MoveNextAsync then throws the
|
||||
// SessionManagerException, which EventStreamService surfaces to the client exactly as
|
||||
// the pre-epic per-RPC overflow did. The pump and every other subscriber are untouched.
|
||||
if (_subscribers.TryRemove(subscriber.Id, out _))
|
||||
{
|
||||
subscriber.Channel.Writer.TryComplete(new SessionManagerException(
|
||||
SessionManagerErrorCode.EventQueueOverflow,
|
||||
$"Session {_sessionId} event stream queue overflowed."));
|
||||
}
|
||||
}
|
||||
|
||||
// Counts external (non-internal) subscribers. Drives the isOnlySubscriber FailFast
|
||||
// decision so the gateway-owned internal dashboard subscriber never inflates the count.
|
||||
private int CountExternalSubscribers()
|
||||
{
|
||||
int count = 0;
|
||||
foreach (Subscriber subscriber in _subscribers.Values)
|
||||
{
|
||||
if (!subscriber.IsInternal)
|
||||
{
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
private void CompleteAllSubscribers(Exception? error)
|
||||
{
|
||||
foreach (Subscriber subscriber in _subscribers.Values)
|
||||
{
|
||||
subscriber.Channel.Writer.TryComplete(error);
|
||||
}
|
||||
}
|
||||
|
||||
private void Unregister(Subscriber subscriber)
|
||||
{
|
||||
if (_subscribers.TryRemove(subscriber.Id, out _))
|
||||
{
|
||||
subscriber.Channel.Writer.TryComplete();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the retained events with <see cref="MxEvent.WorkerSequence"/> strictly
|
||||
/// greater than <paramref name="afterSequence"/>, in ascending sequence order, so a
|
||||
/// reconnecting or reattaching subscriber can replay what it missed.
|
||||
/// </summary>
|
||||
/// <param name="afterSequence">
|
||||
/// The last worker sequence the caller already observed. Only events newer than this
|
||||
/// are returned.
|
||||
/// </param>
|
||||
/// <param name="events">
|
||||
/// The retained events newer than <paramref name="afterSequence"/>, in order. Never
|
||||
/// null; empty when nothing newer is retained.
|
||||
/// </param>
|
||||
/// <param name="gap">
|
||||
/// <see langword="true"/> when events between <paramref name="afterSequence"/> and the
|
||||
/// oldest retained event were already evicted (by capacity or age), meaning the caller
|
||||
/// missed events that can no longer be replayed and must re-snapshot. When
|
||||
/// <see langword="true"/>, whatever IS still retained is still returned via
|
||||
/// <paramref name="events"/>.
|
||||
/// </param>
|
||||
/// <returns>
|
||||
/// Always <see langword="true"/> — the out parameters fully describe the result. The
|
||||
/// return value exists for a fluent call shape and future extension.
|
||||
/// </returns>
|
||||
/// <remarks>
|
||||
/// <para>Gap semantics, by buffer state:</para>
|
||||
/// <list type="bullet">
|
||||
/// <item>
|
||||
/// Buffer non-empty: <paramref name="gap"/> is <see langword="true"/> iff
|
||||
/// <paramref name="afterSequence"/> is below the oldest retained sequence minus
|
||||
/// one (i.e. at least one event newer than <paramref name="afterSequence"/> but
|
||||
/// older than the oldest retained was evicted). When
|
||||
/// <paramref name="afterSequence"/> equals or exceeds the newest retained
|
||||
/// sequence the caller is fully caught up: empty list, no gap.
|
||||
/// </item>
|
||||
/// <item>
|
||||
/// Buffer empty (retention disabled, nothing seen yet, or everything evicted):
|
||||
/// empty list, and <paramref name="gap"/> is <see langword="true"/> iff
|
||||
/// <paramref name="afterSequence"/> is below the highest sequence ever seen —
|
||||
/// i.e. the caller is behind but nothing is retained to replay. If no event has
|
||||
/// ever been seen, or the caller is already at/ahead of the highest seen, there
|
||||
/// is nothing to miss: no gap.
|
||||
/// </item>
|
||||
/// </list>
|
||||
/// </remarks>
|
||||
public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList<MxEvent> events, out bool gap)
|
||||
{
|
||||
lock (_replayLock)
|
||||
{
|
||||
EvictAged();
|
||||
|
||||
if (_replayBuffer.Count == 0)
|
||||
{
|
||||
events = [];
|
||||
// Nothing retained. The caller missed events only if it is behind the
|
||||
// highest sequence ever seen (and we have seen at least one event).
|
||||
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
|
||||
return true;
|
||||
}
|
||||
|
||||
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
|
||||
|
||||
// A gap exists when at least one event newer than afterSequence was evicted,
|
||||
// i.e. afterSequence sits below the oldest-retained-minus-one boundary.
|
||||
// Written as (oldestRetained > 0 && afterSequence < oldestRetained - 1) to
|
||||
// avoid wrapping when afterSequence == ulong.MaxValue (afterSequence + 1
|
||||
// would overflow to 0, falsely reporting a gap).
|
||||
gap = oldestRetained > 0 && afterSequence < oldestRetained - 1;
|
||||
|
||||
// O(n) scan over the retained buffer — acceptable because TryGetReplayFrom
|
||||
// is only called on subscriber reconnect, never on the hot fan-out path.
|
||||
List<MxEvent> newer = [];
|
||||
foreach (ReplayEntry entry in _replayBuffer)
|
||||
{
|
||||
if (entry.Event.WorkerSequence > afterSequence)
|
||||
{
|
||||
newer.Add(entry.Event);
|
||||
}
|
||||
}
|
||||
|
||||
events = newer;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private void AppendToReplayBuffer(MxEvent mxEvent)
|
||||
{
|
||||
lock (_replayLock)
|
||||
{
|
||||
_anyEventSeen = true;
|
||||
if (mxEvent.WorkerSequence > _highestSequenceSeen)
|
||||
{
|
||||
_highestSequenceSeen = mxEvent.WorkerSequence;
|
||||
}
|
||||
|
||||
// Capacity 0 disables retention: track the highest-seen sequence (so replay
|
||||
// can still report a gap) but keep no events.
|
||||
if (_replayBufferCapacity == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow()));
|
||||
|
||||
// Capacity eviction: drop oldest until within bound.
|
||||
while (_replayBuffer.Count > _replayBufferCapacity)
|
||||
{
|
||||
_replayBuffer.RemoveFirst();
|
||||
}
|
||||
|
||||
EvictAged();
|
||||
}
|
||||
}
|
||||
|
||||
// Must be called under _replayLock. Drops entries older than the retention window.
|
||||
private void EvictAged()
|
||||
{
|
||||
if (!_ageEvictionEnabled || _replayBuffer.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention;
|
||||
while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff)
|
||||
{
|
||||
_replayBuffer.RemoveFirst();
|
||||
}
|
||||
}
|
||||
|
||||
private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt);
|
||||
|
||||
private sealed class Subscriber(long id, Channel<MxEvent> channel, bool isInternal)
|
||||
{
|
||||
public long Id { get; } = id;
|
||||
|
||||
public Channel<MxEvent> Channel { get; } = channel;
|
||||
|
||||
// True for the gateway-owned internal dashboard subscriber. Excluded from the
|
||||
// single-subscriber overflow accounting so it cannot fault the session.
|
||||
public bool IsInternal { get; } = isInternal;
|
||||
}
|
||||
|
||||
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
|
||||
: IEventSubscriberLease
|
||||
{
|
||||
private int _leaseDisposed;
|
||||
|
||||
public ChannelReader<MxEvent> Reader => subscriber.Channel.Reader;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
// Atomic check-and-set so concurrent Dispose calls unregister at most once.
|
||||
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
|
||||
{
|
||||
distributor.Unregister(subscriber);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
|
||||
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Dependencies a <see cref="GatewaySession"/> needs to construct and own its
|
||||
/// <see cref="SessionEventDistributor"/>. Bundled so the session constructor stays a
|
||||
/// single optional parameter rather than four, and so unit tests that build a session
|
||||
/// directly get a working distributor from <see cref="Default"/> without wiring DI.
|
||||
/// </summary>
|
||||
/// <param name="Mapper">
|
||||
/// Maps worker IPC <c>WorkerEvent</c> frames to public <c>MxEvent</c>s. The distributor
|
||||
/// pump applies this once per event in worker order, mirroring the mapping
|
||||
/// <c>EventStreamService.ProduceEventsAsync</c> used before Task 4.
|
||||
/// </param>
|
||||
/// <param name="EventOptions">
|
||||
/// Supplies the distributor's per-subscriber queue capacity and replay ring-buffer
|
||||
/// bounds (<see cref="EventOptions.QueueCapacity"/>,
|
||||
/// <see cref="EventOptions.ReplayBufferCapacity"/>,
|
||||
/// <see cref="EventOptions.ReplayRetentionSeconds"/>).
|
||||
/// </param>
|
||||
/// <param name="DistributorLogger">Logger for the distributor pump lifecycle.</param>
|
||||
/// <param name="TimeProvider">Clock used to timestamp and age-evict replay entries.</param>
|
||||
/// <param name="Metrics">
|
||||
/// Gateway metrics sink used by the session's per-subscriber overflow handler to record
|
||||
/// the queue-overflow counter and, for legacy single-subscriber FailFast, the session
|
||||
/// fault. Carrying it here keeps the distributor decoupled from the metrics type while
|
||||
/// preserving the observability the pre-epic per-RPC overflow path emitted.
|
||||
/// </param>
|
||||
/// <param name="DashboardBroadcaster">
|
||||
/// Sink the session's internal dashboard mirror loop (Task 6) publishes raw session
|
||||
/// <c>MxEvent</c>s to. When non-null the session registers an internal distributor
|
||||
/// subscriber on becoming Ready and mirrors every fanned event to the dashboard
|
||||
/// EventsHub group regardless of whether a gRPC client is streaming. When null
|
||||
/// (unit tests that don't exercise the dashboard mirror) no mirror is started.
|
||||
/// </param>
|
||||
public sealed record SessionEventStreaming(
|
||||
MxAccessGrpcMapper Mapper,
|
||||
EventOptions EventOptions,
|
||||
ILogger<SessionEventDistributor> DistributorLogger,
|
||||
TimeProvider TimeProvider,
|
||||
GatewayMetrics Metrics,
|
||||
IDashboardEventBroadcaster? DashboardBroadcaster = null)
|
||||
{
|
||||
/// <summary>
|
||||
/// Defaults used when a session is constructed without explicit streaming
|
||||
/// dependencies (unit tests). Uses a fresh mapper, default event options, a no-op
|
||||
/// logger, the system clock, a fresh metrics sink, and no dashboard mirror.
|
||||
/// </summary>
|
||||
public static SessionEventStreaming Default { get; } = new(
|
||||
new MxAccessGrpcMapper(),
|
||||
new EventOptions(),
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
new GatewayMetrics());
|
||||
}
|
||||
@@ -25,6 +25,9 @@ public sealed class SessionManager : ISessionManager
|
||||
private readonly ILogger<SessionManager> _logger;
|
||||
private readonly GatewayOptions _options;
|
||||
private readonly SemaphoreSlim _sessionSlots;
|
||||
private readonly Grpc.MxAccessGrpcMapper _eventMapper;
|
||||
private readonly ILogger<SessionEventDistributor> _distributorLogger;
|
||||
private readonly Dashboard.Hubs.IDashboardEventBroadcaster? _dashboardEventBroadcaster;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of <see cref="SessionManager"/>.
|
||||
@@ -35,13 +38,24 @@ public sealed class SessionManager : ISessionManager
|
||||
/// <param name="metrics">Gateway metrics.</param>
|
||||
/// <param name="timeProvider">Time provider for timestamps.</param>
|
||||
/// <param name="logger">Logger.</param>
|
||||
/// <param name="eventMapper">Mapper used by each session's event distributor to map worker events to public events.</param>
|
||||
/// <param name="distributorLogger">Logger passed to each session's event distributor pump.</param>
|
||||
/// <param name="dashboardEventBroadcaster">
|
||||
/// Dashboard SignalR fan-out sink. Each session registers an internal distributor
|
||||
/// subscriber (Task 6) that mirrors raw session events to this broadcaster, so the
|
||||
/// dashboard receives events regardless of whether a gRPC client is streaming. Null in
|
||||
/// unit tests that do not exercise the dashboard mirror.
|
||||
/// </param>
|
||||
public SessionManager(
|
||||
ISessionRegistry registry,
|
||||
ISessionWorkerClientFactory workerClientFactory,
|
||||
IOptions<GatewayOptions> options,
|
||||
GatewayMetrics metrics,
|
||||
TimeProvider? timeProvider = null,
|
||||
ILogger<SessionManager>? logger = null)
|
||||
ILogger<SessionManager>? logger = null,
|
||||
Grpc.MxAccessGrpcMapper? eventMapper = null,
|
||||
ILogger<SessionEventDistributor>? distributorLogger = null,
|
||||
Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
|
||||
{
|
||||
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
|
||||
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
|
||||
@@ -49,6 +63,9 @@ public sealed class SessionManager : ISessionManager
|
||||
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? NullLogger<SessionManager>.Instance;
|
||||
_eventMapper = eventMapper ?? new Grpc.MxAccessGrpcMapper();
|
||||
_distributorLogger = distributorLogger ?? NullLogger<SessionEventDistributor>.Instance;
|
||||
_dashboardEventBroadcaster = dashboardEventBroadcaster;
|
||||
_options = options.Value;
|
||||
_sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions);
|
||||
}
|
||||
@@ -58,11 +75,13 @@ public sealed class SessionManager : ISessionManager
|
||||
/// </summary>
|
||||
/// <param name="request">Session open request.</param>
|
||||
/// <param name="clientIdentity">Client authentication identity.</param>
|
||||
/// <param name="ownerKeyId">API key identifier of the caller creating the session.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
/// <returns>Opened gateway session.</returns>
|
||||
public async Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
@@ -72,7 +91,7 @@ public sealed class SessionManager : ISessionManager
|
||||
bool sessionOpenedRecorded = false;
|
||||
try
|
||||
{
|
||||
session = CreateSession(request, clientIdentity);
|
||||
session = CreateSession(request, clientIdentity, ownerKeyId);
|
||||
if (!_registry.TryAdd(session))
|
||||
{
|
||||
throw new SessionManagerException(
|
||||
@@ -420,7 +439,8 @@ public sealed class SessionManager : ISessionManager
|
||||
|
||||
private GatewaySession CreateSession(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity)
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId)
|
||||
{
|
||||
string sessionId = CreateSessionId();
|
||||
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
|
||||
@@ -435,19 +455,29 @@ public sealed class SessionManager : ISessionManager
|
||||
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
|
||||
string clientCorrelationId = CreateClientCorrelationId(request.ClientSessionName, sessionId);
|
||||
|
||||
SessionEventStreaming eventStreaming = new(
|
||||
_eventMapper,
|
||||
_options.Events,
|
||||
_distributorLogger,
|
||||
_timeProvider,
|
||||
_metrics,
|
||||
_dashboardEventBroadcaster);
|
||||
|
||||
return new GatewaySession(
|
||||
sessionId,
|
||||
backendName,
|
||||
pipeName,
|
||||
nonce,
|
||||
clientIdentity,
|
||||
ownerKeyId,
|
||||
request.ClientSessionName,
|
||||
clientCorrelationId,
|
||||
commandTimeout,
|
||||
startupTimeout,
|
||||
shutdownTimeout,
|
||||
leaseDuration,
|
||||
openedAt);
|
||||
openedAt,
|
||||
eventStreaming);
|
||||
}
|
||||
|
||||
private static string CreateClientCorrelationId(
|
||||
|
||||
@@ -46,11 +46,14 @@
|
||||
"MaxPendingCommandsPerSession": 128,
|
||||
"DefaultLeaseSeconds": 1800,
|
||||
"LeaseSweepIntervalSeconds": 30,
|
||||
"AllowMultipleEventSubscribers": false
|
||||
"AllowMultipleEventSubscribers": false,
|
||||
"MaxEventSubscribersPerSession": 8
|
||||
},
|
||||
"Events": {
|
||||
"QueueCapacity": 10000,
|
||||
"BackpressurePolicy": "FailFast"
|
||||
"BackpressurePolicy": "FailFast",
|
||||
"ReplayBufferCapacity": 1024,
|
||||
"ReplayRetentionSeconds": 300
|
||||
},
|
||||
"Dashboard": {
|
||||
"Enabled": true,
|
||||
|
||||
@@ -410,6 +410,7 @@ public sealed class AlarmFailoverEndToEndTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
|
||||
@@ -711,6 +711,7 @@ public sealed class GatewayAlarmMonitorProviderModeTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
|
||||
@@ -31,9 +31,11 @@ public sealed class GatewayOptionsTests
|
||||
|
||||
Assert.Equal(30, options.Sessions.DefaultCommandTimeoutSeconds);
|
||||
Assert.Equal(64, options.Sessions.MaxSessions);
|
||||
Assert.Equal(128, options.Sessions.MaxPendingCommandsPerSession);
|
||||
Assert.Equal(1800, options.Sessions.DefaultLeaseSeconds);
|
||||
Assert.Equal(30, options.Sessions.LeaseSweepIntervalSeconds);
|
||||
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
|
||||
Assert.Equal(8, options.Sessions.MaxEventSubscribersPerSession);
|
||||
|
||||
Assert.Equal(10_000, options.Events.QueueCapacity);
|
||||
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
|
||||
|
||||
@@ -289,4 +289,71 @@ public sealed class GatewayOptionsValidatorTests
|
||||
Assert.True(result.Failed);
|
||||
Assert.Contains(result.Failures!, f => f.Contains(keyPart));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// AllowMultipleEventSubscribers / MaxEventSubscribersPerSession validation
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static GatewayOptions CloneWithSessions(GatewayOptions source, SessionOptions sessions)
|
||||
=> new()
|
||||
{
|
||||
Authentication = source.Authentication,
|
||||
Ldap = source.Ldap,
|
||||
Worker = source.Worker,
|
||||
Sessions = sessions,
|
||||
Events = source.Events,
|
||||
Dashboard = source.Dashboard,
|
||||
Protocol = source.Protocol,
|
||||
Alarms = source.Alarms,
|
||||
Tls = source.Tls,
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public void Validate_Succeeds_WhenAllowMultipleEventSubscribersIsTrue()
|
||||
{
|
||||
// AllowMultipleEventSubscribers=true must now validate cleanly (no longer rejected).
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { AllowMultipleEventSubscribers = true });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(0)]
|
||||
[InlineData(-1)]
|
||||
public void Validate_Fails_WhenMaxEventSubscribersPerSessionBelowOne(int value)
|
||||
{
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { MaxEventSubscribersPerSession = value });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Failed);
|
||||
Assert.Contains(
|
||||
result.Failures!,
|
||||
f => f.Contains("MxGateway:Sessions:MaxEventSubscribersPerSession"));
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(1)]
|
||||
[InlineData(8)]
|
||||
[InlineData(32)]
|
||||
public void Validate_Succeeds_WhenMaxEventSubscribersPerSessionIsPositive(int value)
|
||||
{
|
||||
GatewayOptions options = CloneWithSessions(
|
||||
ValidOptions(),
|
||||
new SessionOptions { MaxEventSubscribersPerSession = value });
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Validate_Succeeds_WithDefaultSessionOptions()
|
||||
{
|
||||
// Default SessionOptions (AllowMultipleEventSubscribers=false, MaxEventSubscribersPerSession=8)
|
||||
// must validate cleanly.
|
||||
GatewayOptions options = CloneWithSessions(ValidOptions(), new SessionOptions());
|
||||
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,6 +244,7 @@ public sealed class DashboardSessionAdminServiceTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
|
||||
@@ -268,15 +268,13 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
|
||||
workerClientFactory,
|
||||
options,
|
||||
_metrics,
|
||||
logger: NullLogger<SessionManager>.Instance);
|
||||
logger: NullLogger<SessionManager>.Instance,
|
||||
dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance);
|
||||
MxAccessGrpcMapper mapper = new();
|
||||
EventStreamService eventStreamService = new(
|
||||
sessionManager,
|
||||
options,
|
||||
mapper,
|
||||
_metrics,
|
||||
NullDashboardEventBroadcaster.Instance,
|
||||
NullLogger<EventStreamService>.Instance);
|
||||
_metrics);
|
||||
|
||||
Service = new MxAccessGatewayService(
|
||||
sessionManager,
|
||||
|
||||
@@ -9,7 +9,6 @@ using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc;
|
||||
|
||||
@@ -157,59 +156,99 @@ public sealed class EventStreamServiceTests
|
||||
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that event queue overflow faults the session and reports the overflow metric.</summary>
|
||||
/// <summary>
|
||||
/// Re-targeted in Task 5: a per-subscriber channel overflow in the session's
|
||||
/// <see cref="SessionEventDistributor"/> faults the whole session under the legacy
|
||||
/// single-subscriber FailFast policy (the default, single-subscriber mode) and records
|
||||
/// the overflow + fault metrics. The distributor completes this subscriber's channel
|
||||
/// with the overflow fault, which surfaces here as the same
|
||||
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/> the pre-epic per-RPC
|
||||
/// overflow produced.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySession(workerClient);
|
||||
using GatewayMetrics metrics = new();
|
||||
GatewaySession session = CreateReadySession(
|
||||
workerClient,
|
||||
queueCapacity: 1,
|
||||
metrics: metrics,
|
||||
backpressurePolicy: EventBackpressurePolicy.FailFast);
|
||||
EventStreamService service = CreateService(
|
||||
new FakeSessionManager(session),
|
||||
metrics,
|
||||
queueCapacity: 1);
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
|
||||
for (ulong sequence = 1; sequence <= 50; sequence++)
|
||||
{
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
|
||||
}
|
||||
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
await using IAsyncEnumerator<MxEvent> subscriber = service
|
||||
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
|
||||
.GetAsyncEnumerator();
|
||||
|
||||
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||
await WaitUntilAsync(() => session.State == SessionState.Faulted);
|
||||
// The pump fans 50 events into a capacity-1 subscriber channel faster than this
|
||||
// single reader drains, so one of the reads observes the terminal overflow fault.
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||
async () =>
|
||||
{
|
||||
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
|
||||
{
|
||||
}
|
||||
});
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
|
||||
await WaitUntilAsync(() => session.State == SessionState.Faulted);
|
||||
Assert.Equal(SessionState.Faulted, session.State);
|
||||
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
|
||||
Assert.Equal(1, metrics.GetSnapshot().Faults);
|
||||
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
|
||||
Assert.Equal(1, snapshot.QueueOverflows);
|
||||
Assert.Equal(1, snapshot.Faults);
|
||||
// The finally block in StreamEventsAsync calls StreamDisconnected("Detached") on the
|
||||
// overflow+fault path too; pin it here so a regression removing that call is caught.
|
||||
Assert.Equal(1, snapshot.StreamDisconnects);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that the disconnect backpressure policy disconnects the subscriber without faulting the session.</summary>
|
||||
/// <summary>
|
||||
/// Re-targeted in Task 5: under the DisconnectSubscriber policy a per-subscriber
|
||||
/// channel overflow disconnects only that subscriber's stream (terminal
|
||||
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/>) and records the overflow
|
||||
/// metric, but leaves the session <see cref="SessionState.Ready"/> and records no
|
||||
/// fault. The session, pump, and any other subscribers are unaffected.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_WhenStreamQueueOverflowsWithDisconnectPolicy_LeavesSessionReady()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySession(workerClient);
|
||||
using GatewayMetrics metrics = new();
|
||||
GatewaySession session = CreateReadySession(
|
||||
workerClient,
|
||||
queueCapacity: 1,
|
||||
metrics: metrics,
|
||||
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
|
||||
EventStreamService service = CreateService(
|
||||
new FakeSessionManager(session),
|
||||
metrics,
|
||||
queueCapacity: 1,
|
||||
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
|
||||
for (ulong sequence = 1; sequence <= 50; sequence++)
|
||||
{
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
|
||||
}
|
||||
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
await using IAsyncEnumerator<MxEvent> subscriber = service
|
||||
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
|
||||
.GetAsyncEnumerator();
|
||||
|
||||
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
|
||||
async () =>
|
||||
{
|
||||
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
|
||||
{
|
||||
}
|
||||
});
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
@@ -261,81 +300,11 @@ public sealed class EventStreamServiceTests
|
||||
Assert.Equal(1, metrics.GetSnapshot().Faults);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests-026 regression: <see cref="EventStreamService.StreamEventsAsync"/>
|
||||
/// must mirror every yielded event to the
|
||||
/// <see cref="ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster"/>
|
||||
/// seam (the only path that fans events out to dashboard SignalR clients).
|
||||
/// A regression that silently dropped the <c>Publish</c> call — e.g. an
|
||||
/// <c>if</c> accidentally added around it, or the broadcaster ctor
|
||||
/// parameter being removed — would have produced no failing test before
|
||||
/// this fixture existed. The recording fake captures every call and we
|
||||
/// assert one publish per yielded event, with the correct session id and
|
||||
/// preserved <c>WorkerSequence</c>.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySession(workerClient);
|
||||
RecordingDashboardEventBroadcaster recordingBroadcaster = new();
|
||||
EventStreamService service = CreateService(
|
||||
new FakeSessionManager(session),
|
||||
dashboardEventBroadcaster: recordingBroadcaster);
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
|
||||
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
|
||||
|
||||
Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
|
||||
IReadOnlyList<DashboardEventCapture> captures = recordingBroadcaster.Captures;
|
||||
Assert.Equal(2, captures.Count);
|
||||
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
|
||||
Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
|
||||
Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family);
|
||||
Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Server-041 regression: <see cref="EventStreamService"/> must not
|
||||
/// abort the gRPC stream when the dashboard broadcaster throws.
|
||||
/// <c>IDashboardEventBroadcaster.Publish</c> is documented as
|
||||
/// best-effort and never-throw, but the gRPC consumer cannot rely on
|
||||
/// implementation discipline alone — the seam itself swallows the
|
||||
/// fault and logs at debug, mirroring the broadcaster's own
|
||||
/// continuation handler. Without the wrap, the producer loop would
|
||||
/// surface the exception and the client would see a faulted stream
|
||||
/// for a dashboard-mirror failure.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySession(workerClient);
|
||||
using GatewayMetrics metrics = new();
|
||||
ThrowingDashboardEventBroadcaster throwingBroadcaster = new();
|
||||
EventStreamService service = CreateService(
|
||||
new FakeSessionManager(session),
|
||||
metrics,
|
||||
dashboardEventBroadcaster: throwingBroadcaster);
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
|
||||
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
|
||||
|
||||
Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
|
||||
Assert.Equal(2, throwingBroadcaster.PublishAttempts);
|
||||
Assert.NotEqual(SessionState.Faulted, session.State);
|
||||
}
|
||||
|
||||
private static EventStreamService CreateService(
|
||||
FakeSessionManager sessionManager,
|
||||
GatewayMetrics? metrics = null,
|
||||
int queueCapacity = 8,
|
||||
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast,
|
||||
ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
|
||||
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
|
||||
{
|
||||
return new EventStreamService(
|
||||
sessionManager,
|
||||
@@ -347,25 +316,7 @@ public sealed class EventStreamServiceTests
|
||||
BackpressurePolicy = backpressurePolicy,
|
||||
},
|
||||
}),
|
||||
new MxAccessGrpcMapper(),
|
||||
metrics ?? new GatewayMetrics(),
|
||||
dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance,
|
||||
NullLogger<EventStreamService>.Instance);
|
||||
}
|
||||
|
||||
private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
|
||||
{
|
||||
/// <summary>Gets the count of publish attempts.</summary>
|
||||
public int PublishAttempts { get; private set; }
|
||||
|
||||
/// <summary>Increments the attempt count and throws a simulated failure.</summary>
|
||||
/// <param name="sessionId">The session identifier.</param>
|
||||
/// <param name="mxEvent">The event to publish.</param>
|
||||
public void Publish(string sessionId, MxEvent mxEvent)
|
||||
{
|
||||
PublishAttempts++;
|
||||
throw new InvalidOperationException("simulated dashboard broadcaster failure");
|
||||
}
|
||||
metrics ?? new GatewayMetrics());
|
||||
}
|
||||
|
||||
private static async Task<List<MxEvent>> CollectEventsAsync(
|
||||
@@ -393,20 +344,39 @@ public sealed class EventStreamServiceTests
|
||||
|
||||
private static GatewaySession CreateReadySession(
|
||||
FakeWorkerClient workerClient,
|
||||
string sessionId = "session-events")
|
||||
string sessionId = "session-events",
|
||||
int queueCapacity = 8,
|
||||
GatewayMetrics? metrics = null,
|
||||
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
|
||||
{
|
||||
// The per-subscriber overflow policy now lives in the session's
|
||||
// SessionEventDistributor, so the session must share the same metrics sink and
|
||||
// backpressure policy the overflow assertions observe. queueCapacity flows into the
|
||||
// distributor's per-subscriber channel bound, which is what overflows.
|
||||
GatewaySession session = new(
|
||||
sessionId,
|
||||
GatewayContractInfo.DefaultBackendName,
|
||||
"pipe",
|
||||
"nonce",
|
||||
"client",
|
||||
ownerKeyId: null,
|
||||
"client-session",
|
||||
"client-correlation",
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(10),
|
||||
DateTimeOffset.UtcNow);
|
||||
TimeSpan.FromMinutes(30),
|
||||
DateTimeOffset.UtcNow,
|
||||
new SessionEventStreaming(
|
||||
new MxAccessGrpcMapper(),
|
||||
new EventOptions
|
||||
{
|
||||
QueueCapacity = queueCapacity,
|
||||
BackpressurePolicy = backpressurePolicy,
|
||||
},
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
metrics ?? new GatewayMetrics()));
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
|
||||
@@ -471,6 +441,7 @@ public sealed class EventStreamServiceTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(_sessions.Values.First());
|
||||
|
||||
@@ -884,10 +884,12 @@ public sealed class MxAccessGatewayServiceConstraintTests
|
||||
/// <summary>Opens a test session asynchronously.</summary>
|
||||
/// <param name="request">The session open request.</param>
|
||||
/// <param name="clientIdentity">The client identity, if any.</param>
|
||||
/// <param name="ownerKeyId">The API key identifier of the caller, if any.</param>
|
||||
/// <param name="cancellationToken">Token to observe for cancellation.</param>
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken) =>
|
||||
Task.FromResult(seededSessions.Values.First());
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@ public sealed class MxAccessGatewayServiceTests
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Contains("unary-invoke", reply.Capabilities);
|
||||
Assert.Equal("Operator Key", sessionManager.LastClientIdentity);
|
||||
Assert.Equal("operator01", sessionManager.LastOwnerKeyId);
|
||||
Assert.Equal("operator-session", sessionManager.LastOpenRequest?.ClientSessionName);
|
||||
}
|
||||
|
||||
@@ -508,6 +509,9 @@ public sealed class MxAccessGatewayServiceTests
|
||||
/// <summary>The last client identity passed to OpenSessionAsync.</summary>
|
||||
public string? LastClientIdentity { get; private set; }
|
||||
|
||||
/// <summary>The last owner key id passed to OpenSessionAsync.</summary>
|
||||
public string? LastOwnerKeyId { get; private set; }
|
||||
|
||||
/// <summary>The last session ID passed to ReadEventsAsync.</summary>
|
||||
public string? LastReadEventsSessionId { get; private set; }
|
||||
|
||||
@@ -545,10 +549,12 @@ public sealed class MxAccessGatewayServiceTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
LastOpenRequest = request;
|
||||
LastClientIdentity = clientIdentity;
|
||||
LastOwnerKeyId = ownerKeyId;
|
||||
|
||||
return Task.FromResult(OpenSessionResult ?? CreateSession("session-1", processId: 1234));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,323 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.MxGateway.Contracts;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
|
||||
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Task 6 regression tests for the internal dashboard mirror. The dashboard is a
|
||||
/// first-class subscriber on the session's <see cref="SessionEventDistributor"/>, so it
|
||||
/// receives session events whether or not a gRPC client is streaming — fixing the
|
||||
/// "dark feed" where the dashboard only saw events while a gRPC client was actively
|
||||
/// streaming (the inline per-RPC tap removed by this task).
|
||||
/// </summary>
|
||||
public sealed class GatewaySessionDashboardMirrorTests
|
||||
{
|
||||
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// The KEY bug-fix test: the dashboard broadcaster receives session events even when
|
||||
/// NO gRPC <c>StreamEvents</c> subscriber is attached. The session is driven to Ready
|
||||
/// with a fake worker emitting events; only the internal dashboard subscriber exists.
|
||||
/// Before Task 6 the mirror lived inside the per-RPC gRPC loop, so with no gRPC
|
||||
/// subscriber the dashboard saw nothing.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DashboardMirror_ReceivesEvents_WithNoGrpcSubscriber()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
workerClient.Events.Add(CreateWorkerEvent(10, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(11, MxEventFamily.OnWriteComplete));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
RecordingDashboardEventBroadcaster broadcaster = new();
|
||||
|
||||
await using GatewaySession session = CreateSession(workerClient, broadcaster);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
|
||||
// MarkReady starts the internal dashboard mirror; no gRPC subscriber is ever attached.
|
||||
session.MarkReady();
|
||||
|
||||
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
|
||||
|
||||
IReadOnlyList<DashboardEventCapture> captures = broadcaster.Captures;
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
Assert.Equal([10UL, 11UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
|
||||
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A gRPC subscriber and the dashboard both receive every event concurrently. The
|
||||
/// gRPC path is no longer the dashboard's source — both read independent leases fed by
|
||||
/// the single distributor pump.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DashboardMirror_AndGrpcSubscriber_BothReceiveEvents()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(3, MxEventFamily.OnWriteComplete));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
RecordingDashboardEventBroadcaster broadcaster = new();
|
||||
|
||||
await using GatewaySession session = CreateSession(workerClient, broadcaster);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
|
||||
EventStreamService service = new(
|
||||
new SingleSessionManager(session),
|
||||
Options.Create(new GatewayOptions { Events = new EventOptions { QueueCapacity = 8 } }),
|
||||
new GatewayMetrics());
|
||||
|
||||
List<MxEvent> grpcEvents = [];
|
||||
await foreach (MxEvent mxEvent in service
|
||||
.StreamEventsAsync(new StreamEventsRequest { SessionId = session.SessionId }, CancellationToken.None)
|
||||
.WithCancellation(CancellationToken.None))
|
||||
{
|
||||
grpcEvents.Add(mxEvent);
|
||||
}
|
||||
|
||||
await WaitUntilAsync(() => broadcaster.Captures.Count == 3);
|
||||
|
||||
Assert.Equal([1UL, 2UL, 3UL], grpcEvents.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
|
||||
Assert.Equal([1UL, 2UL, 3UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Task 4 hazard guard: starting the pump at Ready with a fast-completing worker stream
|
||||
/// and zero subscribers used to drain into nothing and leave a later subscriber hanging.
|
||||
/// Now the dashboard subscriber is registered BEFORE the pump starts, so even a worker
|
||||
/// stream that completes immediately delivers every event to the dashboard with no hang.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DashboardMirror_FastCompletingWorkerStream_DeliversAllEventsWithoutHang()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
RecordingDashboardEventBroadcaster broadcaster = new();
|
||||
|
||||
await using GatewaySession session = CreateSession(workerClient, broadcaster);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
|
||||
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
|
||||
Assert.Equal([1UL, 2UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The dashboard Publish must be never-throw at the seam too: a throwing broadcaster
|
||||
/// must not fault the session or stop the mirror from continuing past the failure.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DashboardMirror_WhenBroadcasterThrows_DoesNotFaultSessionAndKeepsMirroring()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
|
||||
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
|
||||
workerClient.CompleteAfterConfiguredEvents = true;
|
||||
ThrowingDashboardEventBroadcaster broadcaster = new();
|
||||
|
||||
await using GatewaySession session = CreateSession(workerClient, broadcaster);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
|
||||
await WaitUntilAsync(() => broadcaster.PublishAttempts == 2);
|
||||
Assert.NotEqual(SessionState.Faulted, session.State);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The internal dashboard subscriber must NOT count against the single-subscriber
|
||||
/// guard: a gRPC subscriber can still attach while the dashboard mirror is running.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task DashboardMirror_DoesNotCountAgainstSingleSubscriberGuard()
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
RecordingDashboardEventBroadcaster broadcaster = new();
|
||||
|
||||
await using GatewaySession session = CreateSession(workerClient, broadcaster);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false);
|
||||
Assert.Equal(1, session.ActiveEventSubscriberCount);
|
||||
}
|
||||
|
||||
private static GatewaySession CreateSession(
|
||||
IWorkerClient workerClient,
|
||||
IDashboardEventBroadcaster broadcaster)
|
||||
{
|
||||
return new GatewaySession(
|
||||
sessionId: "session-dashboard-mirror",
|
||||
backendName: GatewayContractInfo.DefaultBackendName,
|
||||
pipeName: "mxaccess-gateway-1-session-dashboard-mirror",
|
||||
nonce: "nonce",
|
||||
clientIdentity: "client-1",
|
||||
ownerKeyId: null,
|
||||
clientSessionName: "test-session",
|
||||
clientCorrelationId: "client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(5),
|
||||
startupTimeout: TimeSpan.FromSeconds(5),
|
||||
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||
leaseDuration: TimeSpan.FromMinutes(30),
|
||||
openedAt: DateTimeOffset.UtcNow,
|
||||
eventStreaming: new SessionEventStreaming(
|
||||
new MxAccessGrpcMapper(),
|
||||
new EventOptions { QueueCapacity = 8 },
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
new GatewayMetrics(),
|
||||
broadcaster));
|
||||
}
|
||||
|
||||
private static WorkerEvent CreateWorkerEvent(ulong sequence, MxEventFamily family)
|
||||
{
|
||||
MxEvent mxEvent = new()
|
||||
{
|
||||
SessionId = "session-dashboard-mirror",
|
||||
Family = family,
|
||||
WorkerSequence = sequence,
|
||||
};
|
||||
|
||||
switch (family)
|
||||
{
|
||||
case MxEventFamily.OnDataChange:
|
||||
mxEvent.OnDataChange = new OnDataChangeEvent();
|
||||
break;
|
||||
case MxEventFamily.OnWriteComplete:
|
||||
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
|
||||
break;
|
||||
}
|
||||
|
||||
return new WorkerEvent { Event = mxEvent };
|
||||
}
|
||||
|
||||
private static async Task WaitUntilAsync(Func<bool> predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null)
|
||||
{
|
||||
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
|
||||
try
|
||||
{
|
||||
while (!predicate())
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}");
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ThrowingDashboardEventBroadcaster : IDashboardEventBroadcaster
|
||||
{
|
||||
private int _publishAttempts;
|
||||
|
||||
public int PublishAttempts => Volatile.Read(ref _publishAttempts);
|
||||
|
||||
public void Publish(string sessionId, MxEvent mxEvent)
|
||||
{
|
||||
Interlocked.Increment(ref _publishAttempts);
|
||||
throw new InvalidOperationException("simulated dashboard broadcaster failure");
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class SingleSessionManager(GatewaySession session) : ISessionManager
|
||||
{
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken) => Task.FromResult(session);
|
||||
|
||||
public bool TryGetSession(string sessionId, out GatewaySession gatewaySession)
|
||||
{
|
||||
gatewaySession = session;
|
||||
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
string sessionId,
|
||||
WorkerCommand command,
|
||||
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
|
||||
|
||||
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
string sessionId,
|
||||
CancellationToken cancellationToken) => session.ReadEventsAsync(cancellationToken);
|
||||
|
||||
public Task<SessionCloseResult> CloseSessionAsync(
|
||||
string sessionId,
|
||||
CancellationToken cancellationToken) =>
|
||||
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||
|
||||
public Task<SessionCloseResult> KillWorkerAsync(
|
||||
string sessionId,
|
||||
string reason,
|
||||
CancellationToken cancellationToken) =>
|
||||
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||
|
||||
public Task<int> CloseExpiredLeasesAsync(
|
||||
DateTimeOffset now,
|
||||
CancellationToken cancellationToken) => Task.FromResult(0);
|
||||
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed class FakeWorkerClient : IWorkerClient
|
||||
{
|
||||
public List<WorkerEvent> Events { get; } = [];
|
||||
|
||||
public bool CompleteAfterConfiguredEvents { get; set; }
|
||||
|
||||
public string SessionId { get; } = "session-dashboard-mirror";
|
||||
|
||||
public int? ProcessId { get; } = 1234;
|
||||
|
||||
public WorkerClientState State { get; } = WorkerClientState.Ready;
|
||||
|
||||
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
WorkerCommand command,
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
|
||||
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (WorkerEvent workerEvent in Events)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
yield return workerEvent;
|
||||
}
|
||||
|
||||
if (CompleteAfterConfiguredEvents)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public void Kill(string reason)
|
||||
{
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Grpc;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
using ZB.MOM.WW.MxGateway.Server.Workers;
|
||||
|
||||
@@ -156,6 +160,66 @@ public sealed class GatewaySessionTests
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Issue-1 regression. Concurrent <c>Dispose()</c> calls on the same
|
||||
/// <see cref="IEventSubscriberLease"/> — as can happen when a gRPC stream
|
||||
/// completion and a client cancellation both fire at the same time — must
|
||||
/// decrement <c>_activeEventSubscriberCount</c> exactly once, never to −1.
|
||||
/// A negative count permanently blocks future subscribers because
|
||||
/// <c>AttachEventSubscriber(allowMultipleSubscribers:false)</c> gates on
|
||||
/// <c>_activeEventSubscriberCount > 0</c>. After both racing disposes finish,
|
||||
/// the count must be exactly 0 and a subsequent single-subscriber attach must
|
||||
/// succeed.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce()
|
||||
{
|
||||
const int Concurrency = 16;
|
||||
const int Iterations = 200;
|
||||
TimeSpan testTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
FakeWorkerClient workerClient = new();
|
||||
GatewaySession session = CreateReadySessionWithEventStreaming(workerClient);
|
||||
|
||||
for (int i = 0; i < Iterations; i++)
|
||||
{
|
||||
// Attach one subscriber; this increments _activeEventSubscriberCount to 1.
|
||||
IEventSubscriberLease lease = session.AttachEventSubscriber(
|
||||
allowMultipleSubscribers: false);
|
||||
|
||||
// Race Concurrency threads all calling Dispose() on the same lease.
|
||||
// Only one must actually run DetachEventSubscriber.
|
||||
using SemaphoreSlim gate = new(0);
|
||||
Task[] tasks = new Task[Concurrency];
|
||||
for (int t = 0; t < Concurrency; t++)
|
||||
{
|
||||
tasks[t] = Task.Run(async () =>
|
||||
{
|
||||
// All threads wait at the gate so they start as simultaneously
|
||||
// as the scheduler allows, maximising the race window.
|
||||
await gate.WaitAsync(testTimeout);
|
||||
lease.Dispose();
|
||||
});
|
||||
}
|
||||
|
||||
gate.Release(Concurrency);
|
||||
await Task.WhenAll(tasks).WaitAsync(testTimeout);
|
||||
|
||||
// Count must be exactly 0 — not negative — after all disposes.
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
|
||||
// Observable contract: a fresh single subscriber must now be attachable
|
||||
// (i.e., the guard _activeEventSubscriberCount > 0 is false).
|
||||
IEventSubscriberLease next = session.AttachEventSubscriber(
|
||||
allowMultipleSubscribers: false);
|
||||
next.Dispose();
|
||||
Assert.Equal(0, session.ActiveEventSubscriberCount);
|
||||
}
|
||||
|
||||
await session.CloseAsync("test-done", CancellationToken.None);
|
||||
await session.DisposeAsync();
|
||||
}
|
||||
|
||||
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
@@ -164,6 +228,7 @@ public sealed class GatewaySessionTests
|
||||
pipeName: "mxaccess-gateway-1-session-test",
|
||||
nonce: "nonce",
|
||||
clientIdentity: "client-1",
|
||||
ownerKeyId: null,
|
||||
clientSessionName: "test-session",
|
||||
clientCorrelationId: "client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(5),
|
||||
@@ -176,6 +241,33 @@ public sealed class GatewaySessionTests
|
||||
return session;
|
||||
}
|
||||
|
||||
private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
sessionId: "session-test-concurrent",
|
||||
backendName: "mxaccess",
|
||||
pipeName: "mxaccess-gateway-1-session-test-concurrent",
|
||||
nonce: "nonce",
|
||||
clientIdentity: "client-1",
|
||||
ownerKeyId: null,
|
||||
clientSessionName: "test-session",
|
||||
clientCorrelationId: "client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(5),
|
||||
startupTimeout: TimeSpan.FromSeconds(5),
|
||||
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||
leaseDuration: TimeSpan.FromMinutes(30),
|
||||
openedAt: DateTimeOffset.UtcNow,
|
||||
eventStreaming: new SessionEventStreaming(
|
||||
new MxAccessGrpcMapper(),
|
||||
new EventOptions { QueueCapacity = 8 },
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
new GatewayMetrics()));
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
return session;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal worker client that parks <see cref="ShutdownAsync"/> until the test
|
||||
/// explicitly releases it. Used to keep <see cref="GatewaySession.CloseAsync"/>
|
||||
|
||||
@@ -0,0 +1,569 @@
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Time.Testing;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Concurrency and fan-out tests for <see cref="SessionEventDistributor"/>, the
|
||||
/// Session Resilience epic's per-session event pump. One pump drains the source
|
||||
/// exactly once and fans every event to N independent per-subscriber channels.
|
||||
/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast.
|
||||
/// </summary>
|
||||
public sealed class SessionEventDistributorTests
|
||||
{
|
||||
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
[Fact]
|
||||
public async Task TwoSubscribers_BothReceiveFannedEventsInOrder()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease leaseA = distributor.Register();
|
||||
using IEventSubscriberLease leaseB = distributor.Register();
|
||||
|
||||
source.Writer.TryWrite(Event(1));
|
||||
source.Writer.TryWrite(Event(2));
|
||||
|
||||
MxEvent a1 = await ReadOneAsync(leaseA.Reader);
|
||||
MxEvent a2 = await ReadOneAsync(leaseA.Reader);
|
||||
MxEvent b1 = await ReadOneAsync(leaseB.Reader);
|
||||
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
|
||||
|
||||
Assert.Equal(1ul, a1.WorkerSequence);
|
||||
Assert.Equal(2ul, a2.WorkerSequence);
|
||||
Assert.Equal(1ul, b1.WorkerSequence);
|
||||
Assert.Equal(2ul, b2.WorkerSequence);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
IEventSubscriberLease leaseA = distributor.Register();
|
||||
using IEventSubscriberLease leaseB = distributor.Register();
|
||||
|
||||
source.Writer.TryWrite(Event(1));
|
||||
_ = await ReadOneAsync(leaseA.Reader);
|
||||
_ = await ReadOneAsync(leaseB.Reader);
|
||||
|
||||
leaseA.Dispose();
|
||||
|
||||
// A's reader must complete (no more delivery) after dispose.
|
||||
await AssertCompletedAsync(leaseA.Reader);
|
||||
|
||||
// B still receives subsequent events.
|
||||
source.Writer.TryWrite(Event(2));
|
||||
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
|
||||
Assert.Equal(2ul, b2.WorkerSequence);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease leaseA = distributor.Register();
|
||||
source.Writer.TryWrite(Event(1));
|
||||
_ = await ReadOneAsync(leaseA.Reader);
|
||||
|
||||
// Late subscriber: only sees events emitted after it registered.
|
||||
using IEventSubscriberLease leaseB = distributor.Register();
|
||||
source.Writer.TryWrite(Event(2));
|
||||
|
||||
MxEvent b = await ReadOneAsync(leaseB.Reader);
|
||||
Assert.Equal(2ul, b.WorkerSequence);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease leaseA = distributor.Register();
|
||||
using IEventSubscriberLease leaseB = distributor.Register();
|
||||
|
||||
// Bounded so a shutdown hang fails fast.
|
||||
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
||||
|
||||
await AssertCompletedAsync(leaseA.Reader);
|
||||
await AssertCompletedAsync(leaseB.Reader);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Register_AfterDispose_ThrowsObjectDisposedException()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
SessionEventDistributor distributor = CreateDistributor(source.Reader);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
|
||||
|
||||
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 3,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
// A live subscriber forces the pump to fan (and thereby retain) each event,
|
||||
// and gives us a deterministic point to know the pump has processed event 5.
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
}
|
||||
|
||||
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||
{
|
||||
MxEvent e = await ReadOneAsync(lease.Reader);
|
||||
Assert.Equal(sequence, e.WorkerSequence);
|
||||
}
|
||||
|
||||
// Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2
|
||||
// were evicted, so a caller asking from 0 missed events => gap=true, and it
|
||||
// gets only the retained tail.
|
||||
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.True(gap);
|
||||
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 10,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
}
|
||||
|
||||
// afterSequence 2 is still inside the retained window [1..5], so no gap and
|
||||
// exactly the newer events 3, 4, 5 come back.
|
||||
bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.False(gap);
|
||||
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses()
|
||||
{
|
||||
FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 100,
|
||||
replayRetentionSeconds: 30,
|
||||
timeProvider: time);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
|
||||
// Two old events, then advance the clock well past the retention window.
|
||||
source.Writer.TryWrite(Event(1));
|
||||
source.Writer.TryWrite(Event(2));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
|
||||
time.Advance(TimeSpan.FromSeconds(60));
|
||||
|
||||
// A fresh event triggers age-eviction of the now-stale entries 1 and 2.
|
||||
source.Writer.TryWrite(Event(3));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
|
||||
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
// Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained.
|
||||
Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence));
|
||||
Assert.True(gap);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap()
|
||||
{
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 10,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
for (ulong sequence = 1; sequence <= 3; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
}
|
||||
|
||||
// afterSequence 3 is at/after the newest retained; nothing newer, and the
|
||||
// caller is fully caught up => empty list, gap=false.
|
||||
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.False(gap);
|
||||
Assert.Empty(replay);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents()
|
||||
{
|
||||
// Disabled buffer: events are tracked for the highest-seen counter but not
|
||||
// retained. A caller behind the highest-seen sequence must be told to re-snapshot.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
for (ulong sequence = 1; sequence <= 3; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
}
|
||||
|
||||
// afterSequence=1 is below highestSeen=3 — gap, nothing to replay.
|
||||
bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.True(gap);
|
||||
Assert.Empty(replay);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents()
|
||||
{
|
||||
// Disabled buffer: caller is already caught up — no gap, nothing to replay.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
for (ulong sequence = 1; sequence <= 3; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
}
|
||||
|
||||
// afterSequence=3 equals highestSeen — caller is fully caught up.
|
||||
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.False(gap);
|
||||
Assert.Empty(replay);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents()
|
||||
{
|
||||
// No events ever seen: nothing can have been missed, so gap must be false.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0);
|
||||
// Pump not started — no events arrive.
|
||||
|
||||
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.False(gap);
|
||||
Assert.Empty(replay);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents()
|
||||
{
|
||||
// ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the
|
||||
// old code used to compare against oldestRetained, falsely reporting gap=true.
|
||||
// The corrected formula must yield gap=false and an empty replay list.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
await using SessionEventDistributor distributor = CreateDistributor(
|
||||
source.Reader,
|
||||
replayBufferCapacity: 10,
|
||||
replayRetentionSeconds: 0);
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
using IEventSubscriberLease lease = distributor.Register();
|
||||
source.Writer.TryWrite(Event(1));
|
||||
_ = await ReadOneAsync(lease.Reader);
|
||||
|
||||
bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||
|
||||
Assert.True(found);
|
||||
Assert.False(gap);
|
||||
Assert.Empty(replay);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning()
|
||||
{
|
||||
// Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and
|
||||
// overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault
|
||||
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
int overflowCalls = 0;
|
||||
// Separate fields for the bool value and the "set" flag so both can use
|
||||
// Volatile.Read/Write; bool? is not valid for the volatile keyword on a local.
|
||||
// Interlocked.Increment on the pump thread is the store for overflowCalls;
|
||||
// Volatile.Read/Write provide ordering for observedIsOnlySubscriber.
|
||||
int observedIsOnlySubscriberSet = 0;
|
||||
bool observedIsOnlySubscriberValue = false;
|
||||
await using SessionEventDistributor distributor = new(
|
||||
"session-test",
|
||||
ct => source.Reader.ReadAllAsync(ct),
|
||||
subscriberQueueCapacity: 2,
|
||||
replayBufferCapacity: 1024,
|
||||
replayRetentionSeconds: 0,
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
(isOnlySubscriber, _) =>
|
||||
{
|
||||
Interlocked.Increment(ref overflowCalls);
|
||||
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
|
||||
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
|
||||
});
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
|
||||
using IEventSubscriberLease slow = distributor.Register();
|
||||
// Healthy subscriber: drains promptly throughout.
|
||||
using IEventSubscriberLease healthy = distributor.Register();
|
||||
|
||||
// Push more events than the slow subscriber's channel can hold while the healthy one
|
||||
// keeps up. The slow channel overflows; the healthy channel does not.
|
||||
for (ulong sequence = 1; sequence <= 10; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
MxEvent received = await ReadOneAsync(healthy.Reader);
|
||||
Assert.Equal(sequence, received.WorkerSequence);
|
||||
}
|
||||
|
||||
// The slow subscriber is disconnected with the overflow fault.
|
||||
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await DrainUntilFaultAsync(slow.Reader));
|
||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
||||
|
||||
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
|
||||
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
|
||||
// pump-thread writes, avoiding a data race by the C# memory model.
|
||||
Assert.Equal(1, Volatile.Read(ref overflowCalls));
|
||||
Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet));
|
||||
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue));
|
||||
Assert.Equal(1, distributor.SubscriberCount);
|
||||
|
||||
// The pump is still running and the healthy subscriber still receives new events.
|
||||
source.Writer.TryWrite(Event(11));
|
||||
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
|
||||
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
|
||||
{
|
||||
// Distributor-level pin for "FailFast with multiple subscribers degrades to
|
||||
// disconnect-only (no session fault)": when the overflowing subscriber is NOT the
|
||||
// sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
|
||||
// fault the session. This test drives the distributor directly (without GatewaySession)
|
||||
// with two subscribers and a FailFast-style overflow handler seam, overflows the slow
|
||||
// one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
|
||||
// receiving, and (c) the pump keeps running — all without a GatewaySession.
|
||||
//
|
||||
// TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
|
||||
// multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
bool handlerFiredWithFalse = false;
|
||||
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
|
||||
await using SessionEventDistributor distributor = new(
|
||||
"session-multi-sub",
|
||||
ct => source.Reader.ReadAllAsync(ct),
|
||||
subscriberQueueCapacity: 2,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0,
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
(isOnlySubscriber, _) =>
|
||||
{
|
||||
if (!isOnlySubscriber)
|
||||
{
|
||||
// Multi-subscriber: FailFast degrades to disconnect-only.
|
||||
Volatile.Write(ref handlerFiredWithFalse, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Single-subscriber: FailFast would fault the session — must not happen here.
|
||||
Volatile.Write(ref sessionFaultWouldBeCalled, true);
|
||||
}
|
||||
});
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
|
||||
using IEventSubscriberLease slow = distributor.Register();
|
||||
// Healthy subscriber: drains every event promptly.
|
||||
using IEventSubscriberLease healthy = distributor.Register();
|
||||
|
||||
// Drive enough events to overflow the slow subscriber's channel.
|
||||
for (ulong sequence = 1; sequence <= 10; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
_ = await ReadOneAsync(healthy.Reader);
|
||||
}
|
||||
|
||||
// Slow subscriber is disconnected with the overflow fault.
|
||||
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await DrainUntilFaultAsync(slow.Reader));
|
||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
||||
|
||||
// The handler saw isOnlySubscriber==false (multi-subscriber degradation path).
|
||||
Assert.True(Volatile.Read(ref handlerFiredWithFalse));
|
||||
// The FailFast session-fault branch was NOT taken (session stays Ready equivalent).
|
||||
Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled));
|
||||
|
||||
// The pump and healthy subscriber are unaffected.
|
||||
source.Writer.TryWrite(Event(11));
|
||||
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
|
||||
Assert.Equal(11ul, afterOverflow.WorkerSequence);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal()
|
||||
{
|
||||
// Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard
|
||||
// subscriber, so a FailFast policy would NOT fault the session even when the internal
|
||||
// subscriber is the ONLY registered subscriber. The overflow handler receives
|
||||
// isOnlySubscriber==false (not true) because the overflowing subscriber is internal
|
||||
// and is therefore excluded from the external-subscriber count.
|
||||
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||
int observedIsOnlySubscriberSet = 0;
|
||||
bool observedIsOnlySubscriberValue = false;
|
||||
bool observedIsInternalValue = false;
|
||||
await using SessionEventDistributor distributor = new(
|
||||
"session-internal-overflow",
|
||||
ct => source.Reader.ReadAllAsync(ct),
|
||||
subscriberQueueCapacity: 2,
|
||||
replayBufferCapacity: 0,
|
||||
replayRetentionSeconds: 0,
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
TimeProvider.System,
|
||||
(isOnlySubscriber, isInternal) =>
|
||||
{
|
||||
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
|
||||
Volatile.Write(ref observedIsInternalValue, isInternal);
|
||||
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
|
||||
});
|
||||
await distributor.StartAsync(CancellationToken.None);
|
||||
|
||||
// Register ONLY an internal subscriber — no external subscriber is attached.
|
||||
using IEventSubscriberLease internalLease = distributor.Register(isInternal: true);
|
||||
|
||||
// Push enough events to overflow the capacity-2 internal subscriber channel.
|
||||
for (ulong sequence = 1; sequence <= 10; sequence++)
|
||||
{
|
||||
source.Writer.TryWrite(Event(sequence));
|
||||
}
|
||||
|
||||
// The internal subscriber is disconnected with the overflow fault.
|
||||
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await DrainUntilFaultAsync(internalLease.Reader));
|
||||
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
|
||||
|
||||
// Wait for the handler to fire (it runs on the pump thread).
|
||||
await Task.Run(async () =>
|
||||
{
|
||||
using CancellationTokenSource cts = new(ReadTimeout);
|
||||
while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0)
|
||||
{
|
||||
await Task.Delay(10, cts.Token);
|
||||
}
|
||||
});
|
||||
|
||||
// isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY
|
||||
// subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the
|
||||
// external count would NOT fault the session.
|
||||
Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired.");
|
||||
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue),
|
||||
"isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it).");
|
||||
Assert.True(Volatile.Read(ref observedIsInternalValue),
|
||||
"isInternal must be true for a subscriber registered with isInternal: true.");
|
||||
}
|
||||
|
||||
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
|
||||
{
|
||||
// Drains any buffered events, then surfaces the channel's completion fault (if any)
|
||||
// by awaiting the final read past the buffered tail.
|
||||
while (true)
|
||||
{
|
||||
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
|
||||
while (reader.TryRead(out _))
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
|
||||
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);
|
||||
|
||||
private static SessionEventDistributor CreateDistributor(
|
||||
ChannelReader<MxEvent> source,
|
||||
int replayBufferCapacity,
|
||||
double replayRetentionSeconds,
|
||||
TimeProvider? timeProvider = null)
|
||||
=> new(
|
||||
"session-test",
|
||||
ct => source.ReadAllAsync(ct),
|
||||
subscriberQueueCapacity: 64,
|
||||
replayBufferCapacity: replayBufferCapacity,
|
||||
replayRetentionSeconds: replayRetentionSeconds,
|
||||
NullLogger<SessionEventDistributor>.Instance,
|
||||
timeProvider ?? TimeProvider.System);
|
||||
|
||||
private static MxEvent Event(ulong sequence)
|
||||
=> new() { SessionId = "session-test", WorkerSequence = sequence };
|
||||
|
||||
private static async Task<MxEvent> ReadOneAsync(ChannelReader<MxEvent> reader)
|
||||
{
|
||||
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
|
||||
Assert.True(reader.TryRead(out MxEvent? value));
|
||||
return value!;
|
||||
}
|
||||
|
||||
private static async Task AssertCompletedAsync(ChannelReader<MxEvent> reader)
|
||||
{
|
||||
// Drain anything still buffered, then assert the channel is completed
|
||||
// (no further events). Bounded so a never-completing channel fails fast.
|
||||
await reader.Completion.WaitAsync(ReadTimeout);
|
||||
}
|
||||
}
|
||||
@@ -663,7 +663,7 @@ public sealed class SessionManagerBulkTests
|
||||
private static async Task<GatewaySession> OpenSessionAsync(IWorkerClient workerClient)
|
||||
{
|
||||
SessionManager manager = CreateManager(workerClient);
|
||||
return await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
return await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
}
|
||||
|
||||
private static SessionManager CreateManager(IWorkerClient workerClient)
|
||||
|
||||
@@ -23,7 +23,7 @@ public sealed class SessionManagerTests
|
||||
using GatewayMetrics metrics = new();
|
||||
SessionManager manager = CreateManager(factory, metrics: metrics);
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession? registered));
|
||||
Assert.Same(session, registered);
|
||||
@@ -34,6 +34,36 @@ public sealed class SessionManagerTests
|
||||
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a session opened by an authenticated caller records that caller's API key id in OwnerKeyId.</summary>
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_WithOwnerKeyId_RecordsOwnerKeyIdOnSession()
|
||||
{
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
clientIdentity: "MyKey Display",
|
||||
ownerKeyId: "key-abc123",
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal("key-abc123", session.OwnerKeyId);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a session opened without an owner key id records null in OwnerKeyId.</summary>
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_WithNullOwnerKeyId_RecordsNullOwnerKeyIdOnSession()
|
||||
{
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
clientIdentity: null,
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Null(session.OwnerKeyId);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that opening a session sets the initial lease expiry from the configured default lease.</summary>
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_SetsInitialDefaultLease()
|
||||
@@ -45,7 +75,7 @@ public sealed class SessionManagerTests
|
||||
options: options,
|
||||
timeProvider: clock);
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
Assert.Equal(clock.GetUtcNow() + TimeSpan.FromMinutes(30), session.LeaseExpiresAt);
|
||||
}
|
||||
@@ -61,7 +91,7 @@ public sealed class SessionManagerTests
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
Assert.Equal($"rust-load-client-{session.SessionId}", session.ClientCorrelationId);
|
||||
}
|
||||
@@ -76,7 +106,7 @@ public sealed class SessionManagerTests
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
Assert.Equal($"client-{session.SessionId}", session.ClientCorrelationId);
|
||||
}
|
||||
@@ -87,7 +117,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
WorkerCommandReply reply = await manager.InvokeAsync(
|
||||
session.SessionId,
|
||||
@@ -108,6 +138,7 @@ public sealed class SessionManagerTests
|
||||
"mxaccess-gateway-1-session-lease-refresh",
|
||||
"nonce",
|
||||
"client-1",
|
||||
null,
|
||||
"test-session",
|
||||
"client-correlation-1",
|
||||
TimeSpan.FromSeconds(30),
|
||||
@@ -156,7 +187,7 @@ public sealed class SessionManagerTests
|
||||
},
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
IReadOnlyList<SubscribeResult> results = await session.SubscribeBulkAsync(
|
||||
12,
|
||||
@@ -207,7 +238,7 @@ public sealed class SessionManagerTests
|
||||
},
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
IReadOnlyList<BulkWriteResult> results = await session.WriteBulkAsync(
|
||||
12,
|
||||
@@ -268,7 +299,7 @@ public sealed class SessionManagerTests
|
||||
},
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
IReadOnlyList<BulkReadResult> results = await session.ReadBulkAsync(
|
||||
12,
|
||||
@@ -291,7 +322,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
session.MarkFaulted("test fault");
|
||||
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
@@ -316,7 +347,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
// Force a state mismatch: session stays Ready, worker transitions out.
|
||||
workerClient.State = WorkerClientState.Handshaking;
|
||||
@@ -341,7 +372,7 @@ public sealed class SessionManagerTests
|
||||
FakeWorkerClient workerClient = new();
|
||||
using GatewayMetrics metrics = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
|
||||
SessionManagerException secondClose = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
@@ -366,7 +397,7 @@ public sealed class SessionManagerTests
|
||||
"Worker shutdown timed out."),
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None));
|
||||
@@ -397,6 +428,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession firstSession = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-1",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
metrics.EventReceived(firstSession.SessionId, MxEventFamily.OnDataChange.ToString());
|
||||
|
||||
@@ -405,6 +437,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession secondSession = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-2",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode);
|
||||
@@ -440,6 +473,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-1",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Task<SessionCloseResult> firstClose = manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
|
||||
@@ -482,7 +516,7 @@ public sealed class SessionManagerTests
|
||||
FakeWorkerClient workerClient = new();
|
||||
using GatewayMetrics metrics = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
SessionCloseResult result = await manager.KillWorkerAsync(session.SessionId, "test-kill", CancellationToken.None);
|
||||
|
||||
@@ -510,7 +544,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentException>(
|
||||
async () => await manager.KillWorkerAsync(session.SessionId, blankReason, CancellationToken.None));
|
||||
@@ -529,7 +563,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
async () => await manager.KillWorkerAsync(session.SessionId, null!, CancellationToken.None));
|
||||
@@ -569,6 +603,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-1",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
|
||||
@@ -598,6 +633,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-1",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Task<SessionCloseResult> first = manager.KillWorkerAsync(session.SessionId, "kill-a", CancellationToken.None);
|
||||
@@ -641,6 +677,7 @@ public sealed class SessionManagerTests
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(),
|
||||
"client-1",
|
||||
ownerKeyId: null,
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
|
||||
@@ -666,7 +703,7 @@ public sealed class SessionManagerTests
|
||||
metrics);
|
||||
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None));
|
||||
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None));
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode);
|
||||
Assert.Equal(0, registry.Count);
|
||||
@@ -682,8 +719,8 @@ public sealed class SessionManagerTests
|
||||
FakeWorkerClient activeClient = new();
|
||||
QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient);
|
||||
SessionManager manager = CreateManager(factory);
|
||||
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
|
||||
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", ownerKeyId: null, CancellationToken.None);
|
||||
DateTimeOffset now = DateTimeOffset.UtcNow;
|
||||
expiredSession.ExtendLease(now.AddSeconds(-1));
|
||||
activeSession.ExtendLease(now.AddMinutes(5));
|
||||
@@ -703,7 +740,7 @@ public sealed class SessionManagerTests
|
||||
{
|
||||
FakeWorkerClient workerClient = new();
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
DateTimeOffset now = DateTimeOffset.UtcNow;
|
||||
session.ExtendLease(now.AddSeconds(-1));
|
||||
using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false);
|
||||
@@ -724,8 +761,8 @@ public sealed class SessionManagerTests
|
||||
QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient);
|
||||
using GatewayMetrics metrics = new();
|
||||
SessionManager manager = CreateManager(factory, metrics: metrics);
|
||||
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
|
||||
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
|
||||
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", ownerKeyId: null, CancellationToken.None);
|
||||
|
||||
await manager.ShutdownAsync(CancellationToken.None);
|
||||
|
||||
|
||||
+1
@@ -416,6 +416,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
string? ownerKeyId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
OpenSessionCount++;
|
||||
|
||||
Reference in New Issue
Block a user