Compare commits

...

16 Commits

Author SHA1 Message Date
Joseph Doherty 7b12eebbd1 docs: add MaxEventSubscribersPerSession to config shape example; assert its default
Add MaxEventSubscribersPerSession (value 8) to the Sessions block of the
Configuration Shape JSON example in GatewayConfiguration.md, matching the
appsettings.json default the options table already documents. Assert both
MaxEventSubscribersPerSession (8) and MaxPendingCommandsPerSession (128)
defaults in GatewayOptionsTests.OptionsBinding_UsesDesignDefaults.
2026-06-15 15:16:44 -04:00
Joseph Doherty bd190ab012 feat(config): allow multiple event subscribers + add MaxEventSubscribersPerSession cap
Remove the hard-rejection of AllowMultipleEventSubscribers=true in GatewayOptionsValidator
(fan-out is now implemented via SessionEventDistributor). Add MaxEventSubscribersPerSession
(default 8, must be >= 1) to SessionOptions, validate it, expose it in
EffectiveSessionConfiguration / GatewayConfigurationProvider, document it in
GatewayConfiguration.md and appsettings.json. Tests cover the no-error path for
AllowMultipleEventSubscribers=true, the 0/-1 rejection, positive pass, and default pass.
2026-06-15 15:13:21 -04:00
Joseph Doherty 2ead9bc200 fix(dashboard): close StartDashboardMirror/DisposeAsync race; internal-overflow test + metric label
(1) GatewaySession.StartDashboardMirror: publish _dashboardMirrorLease and _dashboardMirrorTask
    atomically under one _syncRoot section; if the session is already Closing/Closed/Faulted,
    dispose the just-created lease and return without starting the mirror task so nothing is orphaned.
(2) WaitUntilAsync test helper: catch OperationCanceledException and call Assert.Fail with the
    timeout duration and predicate source text instead of letting the exception propagate raw.
(3) New SessionEventDistributorTests.InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse:
    verifies CountExternalSubscribers excludes the internal subscriber, so isOnlySubscriber==false
    even when the internal subscriber is the only registered subscriber.
(4) SubscriberOverflowHandler delegate gains isInternal parameter; overflow metric label is
    "dashboard-mirror" for internal subscribers and "grpc-event-stream" for external ones.
(5) DashboardEventBroadcaster.Publish: wrap SendAsync Task acquisition in try/catch so a
    synchronous throw cannot escape the never-throw Publish interface contract.
2026-06-15 15:02:36 -04:00
Joseph Doherty 1ea08c3b10 feat(dashboard): mirror events via SessionEventDistributor subscriber (fixes dark feed without gRPC client) 2026-06-15 14:42:32 -04:00
Joseph Doherty 4f43733b96 test(sessions): document overflow race safety + close backpressure coverage gaps
- Issue 1: document the isOnlySubscriber snapshot race-safety assumption in
  OnSubscriberOverflow; flags the Task 7/8 revisit point explicitly.
- Issue 2: pin StreamDisconnects==1 in the FailFast overflow test so a
  regression dropping the StreamDisconnected("Detached") finally call is caught.
- Issue 3: replace plain int/bool? reads in SlowSubscriberOverflow test with
  Volatile.Read/Write + Interlocked.Increment stores to close the C# memory
  model data race on overflowCalls and observedIsOnlySubscriber.
- Issue 4: add SlowSubscriberOverflow_WithMultipleSubscribers_... distributor
  test pinning that isOnlySubscriber==false disables the session-fault path;
  includes TODO(Task 8) note for the GatewaySession-level assertion.
- Issue 5: reword SubscriberOverflowHandler XML doc to make explicit that the
  handler must NOT complete the subscriber's channel; the distributor owns that.
2026-06-15 13:46:37 -04:00
Joseph Doherty 039111ca05 feat(sessions): per-subscriber backpressure isolation in SessionEventDistributor 2026-06-15 13:39:25 -04:00
Joseph Doherty 61627fc5b0 fix(sessions): make EventSubscriberLease dispose atomic; dedupe lease dispose
Issue 1: replace plain bool _disposed in EventSubscriberLease with an
Interlocked.Exchange int (_leaseDisposed) matching the SubscriberLease
pattern in SessionEventDistributor. Concurrent stream-completion +
client-cancellation racing Dispose() now decrements _activeEventSubscriberCount
exactly once, never to -1.

Issue 5: remove the `using` declaration on the subscriber lease in
EventStreamService.StreamEventsAsync; the finally block already disposes it
alongside the reader, so the using was a redundant second dispose on the
same code path.

Issue 2: add an inline comment at the StartAsync().GetAwaiter().GetResult()
call documenting the sync-over-async invariant (StartAsync only schedules via
Task.Run and is synchronous; do not make it truly async without changing
this call site).

Issue 10: remove the redundant .WithCancellation(cancellationToken) chained
on ReadEventsAsync(cancellationToken) in MapWorkerEventsAsync; the
[EnumeratorCancellation] token already flows through the direct argument.

Issue 9: add EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce
to GatewaySessionTests — 16 concurrent Dispose() calls on the same lease for
200 iterations; asserts count is exactly 0 after each race and a subsequent
single-subscriber AttachEventSubscriber succeeds.
2026-06-15 13:29:27 -04:00
Joseph Doherty 7f1018bac1 feat(sessions): route event streaming through SessionEventDistributor 2026-06-15 13:18:28 -04:00
Joseph Doherty c2c518862f fix(sessions): replay-buffer gap edge cases, effective-config exposure, capacity-0 tests
#2: Replace afterSequence+1<oldestRetained with overflow-safe oldestRetained>0&&afterSequence<oldestRetained-1 to prevent ulong wrap at MaxValue falsely reporting gap=true.
#3: Add ReplayBufferCapacity and ReplayRetentionSeconds to EffectiveEventConfiguration and populate from EventOptions in GatewayConfigurationProvider.
#4: Add four new SessionEventDistributorTests covering capacity=0 gap/no-gap paths and the ulong.MaxValue boundary case.
#5: Update class-level <remarks> to describe the Task 3 replay ring buffer (capacity + age eviction, TryGetReplayFrom) rather than its absence.
#6: Add O(n)-is-acceptable comment at TryGetReplayFrom linear scan.
#8: Narrow no-replay 4-arg ctor to internal; InternalsVisibleTo already covers the test project.
2026-06-15 12:48:11 -04:00
Joseph Doherty e962737d2c feat(sessions): add bounded replay ring buffer to SessionEventDistributor 2026-06-15 12:42:15 -04:00
Joseph Doherty 7773bdebbd fix(sessions): close SessionEventDistributor dispose/register races + add overflow logging 2026-06-15 12:37:39 -04:00
Joseph Doherty c79b292968 feat(sessions): add SessionEventDistributor (pump + per-subscriber fan-out skeleton) 2026-06-15 12:32:13 -04:00
Joseph Doherty a43b2ee6af test(sessions): cover OwnerKeyId service-layer forwarding; doc 11-param ctor
Add LastOwnerKeyId capture to FakeSessionManager and assert it equals
"operator01" in OpenSession_WithValidRequest_ReturnsSessionDetails, closing
the gap where OwnerKeyId threading through the service layer had no test
coverage. Add a <remarks> to the 11-param GatewaySession convenience ctor
documenting that OwnerKeyId is null there and authenticated call sites must
use the 12-param overload.
2026-06-15 12:29:16 -04:00
Joseph Doherty f5479f3ca3 feat(sessions): record OwnerKeyId on session creation
Add a nullable string? OwnerKeyId property to GatewaySession that captures
the API key identifier (KeyId) of the authenticated caller that opened the
session. Wire it through ISessionManager.OpenSessionAsync → SessionManager
→ GatewaySession constructor. The gRPC service passes identityAccessor
.Current?.KeyId; internal callers (GatewayAlarmMonitor, DashboardLiveDataService)
pass null. Covers the positive and null cases with two new TDD-first tests.
2026-06-15 12:24:29 -04:00
Joseph Doherty 00c849e63b docs: session-resilience implementation plan (28 tasks, 5 phases) 2026-06-15 12:15:34 -04:00
Joseph Doherty 3fc6ccad30 docs: session-resilience epic design (fan-out, reconnect, ACL, reattach) 2026-06-15 12:11:46 -04:00
38 changed files with 3256 additions and 315 deletions
+26 -11
View File
@@ -37,11 +37,14 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
"AllowMultipleEventSubscribers": false,
"MaxEventSubscribersPerSession": 8
},
"Events": {
"QueueCapacity": 10000,
"BackpressurePolicy": "FailFast"
"BackpressurePolicy": "FailFast",
"ReplayBufferCapacity": 1024,
"ReplayRetentionSeconds": 300
},
"Dashboard": {
"Enabled": true,
@@ -123,23 +126,35 @@ to avoid accidental large allocations from malformed or oversized frames.
| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. |
| `MxGateway:Sessions:DefaultLeaseSeconds` | `1800` | Initial session lease and refresh duration. Unary client activity extends the lease by this duration. |
| `MxGateway:Sessions:LeaseSweepIntervalSeconds` | `30` | Hosted monitor interval for closing expired leases. Active event-stream subscribers keep a session from expiring while the stream remains attached. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. |
| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. When `false` the session refuses a second subscriber with `AlreadyExists`. Set to `true` to enable fan-out via the `SessionEventDistributor`. |
| `MxGateway:Sessions:MaxEventSubscribersPerSession` | `8` | Maximum number of concurrent `StreamEvents` subscribers per session when `AllowMultipleEventSubscribers` is `true`. Effectively 1 when `AllowMultipleEventSubscribers` is `false`. Must be greater than zero. |
All numeric session options must be greater than zero. The current event stream
implementation supports one active subscriber per session; this preserves event
ordering and avoids competing consumers.
All numeric session options must be greater than zero.
## Event Options
| Option | Default | Description |
|--------|---------|-------------|
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Per-subscriber event backpressure behavior when a subscriber's bounded event channel overflows. Overflow is isolated to the offending subscriber: it is always disconnected with an `EventQueueOverflow` fault while the session pump and other subscribers keep running. `FailFast` additionally faults the whole session only in the legacy single-subscriber case (the current default mode); with multiple subscribers it degrades to a per-subscriber disconnect so one slow consumer never faults a shared session. `DisconnectSubscriber` disconnects only the slow subscriber in all cases. |
| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. |
| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. |
`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
faults the affected worker or session instead of silently dropping MXAccess
events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
the affected stream while the MXAccess session remains active.
`QueueCapacity` must be greater than zero; it bounds each per-subscriber event
channel fed by the session's single event pump. A slow subscriber overflows only
its own channel and is always disconnected with an `EventQueueOverflow` fault
rather than silently dropping MXAccess events — the pump, the session, and other
subscribers are unaffected. With `FailFast` in the single-subscriber case (the
default mode), that overflow additionally faults the whole session; with multiple
subscribers `FailFast` degrades to a per-subscriber disconnect, matching
`DisconnectSubscriber`, so one slow consumer cannot fault a session shared by
healthy subscribers. With `DisconnectSubscriber`, overflow terminates only the
affected stream while the MXAccess session remains active.
`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or
equal to zero (either dimension can be disabled with `0`). A returning subscriber
that asks for events older than the oldest still-retained event is told it missed
events (a "gap") and must re-snapshot; whatever is still retained is replayed.
## Dashboard Options
+3 -2
View File
@@ -167,7 +167,7 @@ bearer). Each hub class is `[Authorize(Policy = HubClientsPolicy)]`.
|---|---|---|---|---|
| `DashboardSnapshotHub` | `/hubs/snapshot` | `DashboardSnapshotPublisher` (BackgroundService consuming `IDashboardSnapshotService.WatchSnapshotsAsync`) | `DashboardSnapshot` | Sent to all connected clients on every snapshot tick; new connections receive the current snapshot synchronously in `OnConnectedAsync`. |
| `AlarmsHub` | `/hubs/alarms` | `AlarmsHubPublisher` (BackgroundService consuming `IGatewayAlarmService.StreamAsync(filter: null)`) | `AlarmFeedMessage` (`active_alarm` / `snapshot_complete` / `transition`) | Connected clients auto-join `__alarms__`; all clients receive every message. Publisher auto-reconnects every 5s on stream faults. |
| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by `EventStreamService` for each event it forwards to a gRPC client | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. Events appear only while a gRPC client is also consuming that session's events — the dashboard is a passive mirror, not a separate worker subscriber. |
| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by each session's internal dashboard-mirror subscriber on its `SessionEventDistributor` (registered when the session becomes Ready) | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. The dashboard is a first-class distributor subscriber, so it receives the session's events whether or not a gRPC client is streaming. It sees RAW session events — not the per-gRPC-subscriber `AfterWorkerSequence` filtering that `EventStreamService` applies at its own boundary — because the dashboard is a separate LDAP-authenticated monitoring view meant to show the session's full event activity (per-session dashboard ACL is tracked separately). |
`DashboardPageBase` opens a `DashboardSnapshotHub` connection via the connection
factory in `OnInitializedAsync`, seeds `Snapshot` synchronously from
@@ -184,7 +184,8 @@ Default cadences:
- snapshot service produces one snapshot per
`MxGateway:Dashboard:SnapshotIntervalMilliseconds` (default 1s);
- alarm publisher emits on each transition observed by the central monitor;
- event publisher emits per event forwarded by `StreamEvents`.
- event publisher emits per event fanned by the session's `SessionEventDistributor`
to its internal dashboard-mirror subscriber (independent of any gRPC `StreamEvents`).
Avoid pushing every MXAccess data-change event into a wider broadcast group.
The current design routes events strictly through `session:{id}` groups; the
@@ -0,0 +1,233 @@
# Session Resilience Epic — Design
**Date:** 2026-06-15
**Branch:** `feat/session-resilience`
**Source:** `stillpending.md` §2 (intentional v1-deferred items), scoped into a real feature design.
**Status:** Design approved; implementation plan to follow.
## Goal
Lift four deliberately-deferred v1 limitations into supported features, built on
one shared foundation:
1. **Multi-event-subscriber fan-out** (§2: plumbed but validator-blocked).
2. **Reconnectable sessions** (§2: 1:1 session↔connection today).
3. **Per-session dashboard ACL** (§2 / EventsHub `TODO(per-session-acl)`).
4. **Orphan-worker reattach on gateway restart** (§2 — **overturns a hard
CLAUDE.md rule**, see "Documented-rule changes").
These are not peers: fan-out is the keystone, reconnect and reattach reuse its
machinery, and three of the four need a new session-ownership concept.
## Documented-rule changes (explicit, owner-approved)
This epic deliberately reverses three documented v1 decisions. Each reversal is a
required deliverable in the same change as the code:
- **CLAUDE.md:77** "Gateway restart does not reattach orphan workers… do not
design code paths that assume reattachment." → reattach becomes supported,
bounded, and opt-in.
- **`docs/DesignDecisions.md:63-73`** "no reconnectable sessions for v1." →
reconnect becomes supported within a bounded detach-grace window.
- **`docs/DesignDecisions.md:75-80`** single event subscriber per session. →
multi-subscriber fan-out becomes supported, capped.
The owner explicitly accepted overturning the reattach rule during design.
## Current-state seams (verified by recon, with citations)
- `GatewaySession.AttachEventSubscriber(bool allowMultipleSubscribers)`
(`Sessions/GatewaySession.cs:386-408`) guards on a single int
`_activeEventSubscriberCount` (`:16`) under `_syncRoot`; a second subscriber
throws `EventSubscriberAlreadyActive` (`:398`).
- `GatewayOptionsValidator.cs:181-185` hard-rejects
`AllowMultipleEventSubscribers` ("not supported until event fan-out is
implemented"); option bound at `SessionOptions.cs:26-29`.
- `EventStreamService.StreamEventsAsync` (`Grpc/EventStreamService.cs:27-101`)
creates **a new bounded `Channel<MxEvent>` per RPC call** (`:43-50`) and
`ProduceEventsAsync` drains `session.ReadEventsAsync()` directly — a
**destructive, single-consumer drain**. Two RPCs would fight over one queue.
- Backpressure: `ProduceEventsAsync` uses non-blocking `TryWrite`; on overflow
with `EventBackpressurePolicy.FailFast` (default, `EventOptions`) it calls
`session.MarkFaulted` (`EventStreamService.cs:143-162`) — faulting the **whole
session**, not just the slow consumer.
- `DashboardEventBroadcaster.Publish` (`Dashboard/Hubs/DashboardEventBroadcaster.cs:13-44`)
is called **inside** the per-RPC producer loop (`EventStreamService.cs:131-141`)
— so the dashboard only mirrors events while a gRPC client is streaming. Latent
bug: no gRPC subscriber ⇒ dashboard feed is dark.
- Pipe name `mxaccess-gateway-{Environment.ProcessId}-{sessionId}`
(`SessionManager.cs:433`); session id `session-{Guid:N}` (`:479`), in-memory
`SessionRegistry` only (`SessionRegistry.cs:12`), **not persisted**.
- `OrphanWorkerTerminator` (`Workers/OrphanWorkerTerminator.cs:49-112`) discovers
orphans by executable name/path (x64 gateway cannot introspect the x86 worker
module → image-name fallback) and **terminates** them; rationale comment at
`:9-16`.
- Pipe fault → `WorkerClient` read loop detects `EndOfStream`, session →
`Faulted` (`WorkerClient.cs:376-381`); no reattach. Worker launch passes the
per-session nonce via `MXGATEWAY_WORKER_NONCE` env var
(`WorkerProcessLauncher.cs:180-182`).
- Sessions store `ClientIdentity` (informational only, `GatewaySession.cs:114`);
**no `OwnerKeyId`, no per-session ACL.** gRPC `StreamEvents` enforces per-item
read constraints but **no session-level access gate** — any caller who knows a
session id can stream it.
- `EventsHub.SubscribeSession(string)` (`Dashboard/Hubs/EventsHub.cs:46-54`) joins
group `session:{id}`; only hub-level `[Authorize(HubClientsPolicy)]` gates it,
so **any** Admin/Viewer can subscribe to **any** session. `TODO(per-session-acl)`
at `:39-43`. `SnapshotHub`/`AlarmsHub` broadcast to all. Hub bearer
(`HubTokenService`, 30-min) carries name + roles only, **no session scope**.
- `StreamEventsRequest.AfterWorkerSequence` already exists on the wire (the
reconnect replay contract is half-built).
## Shared foundation
### A. `SessionEventDistributor` (one pump, N per-subscriber channels)
Per `GatewaySession`, replace the per-RPC direct drain with a single owned
distributor:
- One background **pump task** drains `ReadEventsAsync()` exactly once.
- Each event is (1) stamped with its worker sequence, (2) appended to a **bounded
replay ring buffer** (retain last `ReplayBufferCapacity` events or
`ReplayRetentionSeconds`, whichever first), and (3) `TryWrite`-fanned to every
registered subscriber's own bounded `Channel<MxEvent>`.
- **Per-subscriber backpressure isolation:** overflow completes only that
subscriber's channel (policy `DisconnectSubscriber`); the session and peers are
untouched. `FailFast``MarkFaulted` is retained only for the legacy
single-subscriber config path, for backward compatibility.
- **Constraint filtering stays per-subscriber:** the pump fans *raw* events; each
subscriber's read loop applies its own API-key read subtree/glob filter exactly
as today. No change to constraint semantics.
- `AttachEventSubscriber` returns a lease carrying that subscriber's channel
reader + its start sequence (for replay). `EventStreamService` reads the lease
channel instead of creating its own channel and draining the session.
### B. Session ownership
Record an authoritative **`OwnerKeyId`** (the creating API key id) on the session
at `OpenSession`, alongside the existing informational `ClientIdentity`. This one
field underpins ACL, reconnect re-validation, and reattach adoption.
## Feature designs
### 1. Multi-subscriber fan-out
- Remove the `GatewayOptionsValidator.cs:181-185` rejection; keep the option but
allow `true`.
- `_activeEventSubscriberCount` → a subscriber-lease collection on the
distributor. New cap `MaxEventSubscribersPerSession` (default 8) → reject the
N+1 attach with `EventSubscriberLimitReached`.
- Dashboard broadcaster registers as a distributor subscriber (removing the inline
tap), fixing the dashboard-dark-without-gRPC bug.
- **No proto change.**
### 2. Reconnectable sessions
- On stream drop, a session in **detach-grace** mode is retained (not closed) for
`DetachGraceSeconds` (separate from the session lease). New session
disconnect-policy value `DetachGrace`.
- On reconnect: client calls `StreamEvents` with the same session id +
`AfterWorkerSequence = lastSeen`. The distributor replays ring-buffer events
with `sequence > AfterWorkerSequence`, then resumes live.
- If the requested sequence is older than the ring's oldest retained event (gone
too long / ring overflowed), the server signals **`ReplayGap`** so the client
re-snapshots. **Contract addition** (a `ReplayGap` status / response marker) →
codegen ripple across all 5 clients.
- Reconnect re-validates caller `OwnerKeyId` == session owner → else
`PermissionDenied`.
### 3. Per-session ACL
- **gRPC (real security win, no proto change):** `Invoke` / `StreamEvents` /
`CloseSession` gated to the owning API key, OR a key holding a new
all-sessions admin scope → else `PermissionDenied`. Enforced in
`MxAccessGatewayService` against session `OwnerKeyId`.
- **Dashboard:** identity-domain mismatch (LDAP Admin/Viewer users vs API-key
sessions) means no natural owner link.
- **Decision required (flagged, not hard-coded):** default proposal —
**Admin** sees all sessions; **Viewer** scoped via config
`Dashboard:GroupToSessionTag` matched against an optional session `Tag`.
Enforced at `EventsHub.SubscribeSession` and in the `/hubs/token` mint
(token gains an allowed-session-tag claim). The owner may instead choose a
strict default (Viewers see nothing unless granted).
### 4. Orphan-worker reattach
- **Stable pipe naming:** drop `{gatewayPid}`; use a persisted stable
gateway-instance id. Replaces the pid's collision-avoidance role.
- **Adoption manifest:** persist a minimal record per live session
(`sessionId → workerPid, nonce, ownerKeyId, pipeName`) in the existing SQLite
store. This is the *only* persisted session state; COM/advise state stays in
the worker.
- **Worker phones home:** the worker runs a reconnect loop with bounded backoff;
the restarted gateway re-opens pipe servers for manifest entries and the
surviving worker re-attaches, presenting its **nonce**. Gateway validates the
nonce against the manifest and **rejects impostors / foreign workers**.
- **Resync, not replay:** the in-memory ring buffer is lost on restart, so a
reattached session's subscribers get `ReplayGap` and re-snapshot. Gateway
resyncs worker view via the now-implemented `GetSessionState` / `GetWorkerInfo`
commands.
- **Safety net retained:** workers self-terminate after `MaxOrphanLifetime` with
no re-adoption; `OrphanWorkerTerminator` stays as the fallback for un-adoptable
or foreign workers. Reattach is opt-in (`Workers:EnableOrphanReattach`,
default off) so the documented-safe behavior remains the default.
- **Pipe protocol:** add an **adopt/reconnect frame** to `mxaccess_worker.proto`
→ worker codegen regen + commit `Generated/` (net48 regen rule applies).
## Contract / codegen impact
Unlike the prior epic, this is **not zero-proto**:
- `mxaccess_gateway.proto``ReplayGap` signal for reconnect (Feature 2).
- `mxaccess_worker.proto` — adopt/reconnect frame (Feature 4).
Per the repo rule: regenerate `Generated/`, commit it, rebuild gateway + worker +
every generated client touched, and update affected docs in the same change.
## Error handling
- Per-subscriber overflow → disconnect that subscriber only; session survives.
- Reconnect past the ring horizon → `ReplayGap`, client re-snapshots (no silent
loss).
- Reattach nonce mismatch → reject + fall back to termination.
- ACL denial → `PermissionDenied` (gRPC) / hub subscribe refused (dashboard).
- All worker COM/STA interactions keep MXAccess parity — no synthesized events,
no "fixing" surprising returns.
## Testing & cross-platform verification
| Area | Test | Host |
|---|---|---|
| Distributor fan-out, per-sub backpressure, replay ring | unit, `FakeWorkerHarness` | local (macOS) |
| Reconnect replay + `ReplayGap` | unit + fake-worker integration | local |
| Session ownership / gRPC ACL | unit + gateway integration | local |
| Dashboard per-session ACL | LDAP test users (`multi-role`/`gw-viewer`) | local + live LDAP |
| Worker adopt frame, reattach handshake | worker unit (net48/x86) | **windev** |
| Gateway-restart reattach round-trip | integration | **windev** + live worker |
| Client `ReplayGap` handling | per-client tests; Java on macOS JDK 21 | local |
TDD throughout; per-task commits; `Generated/` regenerated+committed on proto
changes; docs (incl. the three documented-rule reversals) updated in the same
change as source.
## Delivery order (dependency stack)
Each phase is independently shippable:
1. **Foundation**`SessionEventDistributor` + replay ring + session `OwnerKeyId`
(refactor with no external behavior change; dashboard-dark bug fixed).
2. **Fan-out** — remove validator block, subscriber-lease list, cap, dashboard as
subscriber.
3. **Reconnect** — detach-grace, replay-on-reconnect, `ReplayGap` contract +
client handling.
4. **Per-session ACL** — gRPC owner gate + dashboard scoping.
5. **Reattach** — stable pipe naming, adoption manifest, worker phone-home +
adopt frame, resync, safety net; documented-rule reversals.
## Out of scope
- Cross-gateway / clustered session sharing (single gateway instance only).
- Event persistence beyond the in-memory ring (no durable event log).
- Reconnect across a gateway *restart* with zero event gap (restart always yields
`ReplayGap` by design — the ring is in-memory).
- Per-session ACL on `SnapshotHub` / `AlarmsHub` (they broadcast aggregate state;
only `EventsHub` is session-scoped).
+417
View File
@@ -0,0 +1,417 @@
# Session Resilience Epic — Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:subagent-driven-development (same session) or executing-plans (parallel session) to implement this plan task-by-task.
**Goal:** Lift four deferred v1 limitations — multi-subscriber fan-out, reconnectable sessions, per-session ACL, orphan-worker reattach — onto one shared event-distribution foundation.
**Architecture:** A per-session `SessionEventDistributor` (one pump → N per-subscriber bounded channels + a bounded replay ring) replaces today's per-RPC destructive drain. Session ownership (`OwnerKeyId`) underpins ACL, reconnect re-validation, and reattach adoption. See `docs/plans/2026-06-15-session-resilience-design.md`.
**Tech Stack:** .NET 10 gateway (x64), .NET Framework 4.8 worker (x86, windev), SQLite auth/manifest store, gRPC + protobuf contracts (net10.0;net48), 5 language clients, Blazor/SignalR dashboard, LDAP dashboard auth.
**Cross-platform:** Gateway, dotnet/Go/Rust/Python clients, and the Java client build/test locally on macOS (JDK 21 at `~/.local/jdks/jdk-21.0.11+10/Contents/Home`). The net48/x86 worker and worker tests build/test on **windev** (ssh alias, PowerShell). Proto changes: regenerate `Generated/`, commit it, rebuild every touched component.
**Standing rules (from CLAUDE.md):** never log secrets/credentials/values; MXAccess parity (no synthesized events, no "fixing" surprising returns); no init-only props/positional records in net48 worker; update docs in the same change as source; branch already created (`feat/session-resilience`); per-task commits; build+test affected components before marking done.
---
## Phase 1 — Foundation (refactor; no external behavior change except the dashboard-dark fix)
### Task 1: Add `OwnerKeyId` to the session
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** none (other phase-1 tasks build on the session type)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (add `OwnerKeyId` readonly prop near `ClientIdentity:114`)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs` (set `OwnerKeyId` from the request identity at `OpenSession`, near `CreateSessionId:479`)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs`
**Steps:** TDD — failing test asserting an opened session records the creating API key id → add the property + assignment from `IGatewayRequestIdentityAccessor.Current` → green → `dotnet build src/ZB.MOM.WW.MxGateway.Server` + run session tests → commit.
### Task 2: `SessionEventDistributor` skeleton (single pump, subscriber registry)
**Classification:** high-risk (concurrency / actor model)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Create: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs`
**Design:** One background pump `Task` draining `session.ReadEventsAsync()` exactly once; a thread-safe subscriber collection where each subscriber owns a bounded `Channel<MxEvent>` (`SingleReader=true`, `FullMode=Wait` for the per-sub channel, but writes use non-blocking `TryWrite`). `Register(startSequence)` returns a lease (channel reader + dispose). Pump fans each drained event to all subscriber channels via `TryWrite`.
**Steps:** Failing test: two registered subscribers both receive the same fanned event; disposing one stops its delivery without affecting the other. Implement pump + registry. Green. Build + test. Commit.
### Task 3: Bounded replay ring buffer
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none (extends Task 2)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs` (add `ReplayBufferCapacity`, `ReplayRetentionSeconds`)
- Test: `SessionEventDistributorTests.cs`
**Design:** Append each fanned event to a ring keyed by worker sequence, evicting by count (`ReplayBufferCapacity`) or age (`ReplayRetentionSeconds`), whichever first. Expose `TryGetReplayFrom(afterSequence, out events, out gap)`.
**Steps:** Failing test: events evicted past capacity; `TryGetReplayFrom` returns `gap=true` when requested sequence is older than the oldest retained. Implement. Green. Build+test. Commit.
### Task 4: Rewire `AttachEventSubscriber` + `EventStreamService` onto the distributor
**Classification:** high-risk (changes the live event path)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs:386-408` (own a `SessionEventDistributor`; `AttachEventSubscriber` returns a lease wrapping `distributor.Register(...)`)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:27-101` (read the lease's channel instead of creating a per-RPC channel and draining the session directly; remove the per-RPC `Channel.CreateBounded` at `:43-50`)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs`
**Steps:** Failing test: a single subscriber still streams events end-to-end through the distributor (regression parity with today). Rewire. Keep per-item constraint filtering in the subscriber read loop. Green. Build + run gateway event-stream tests. Commit.
### Task 5: Per-subscriber backpressure isolation
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none (extends Tasks 2/4)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs`
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (overflow path `:143-162`)
- Test: `SessionEventDistributorTests.cs`
**Design:** On a subscriber channel `TryWrite` failure, complete only that subscriber's channel with `EventQueueOverflow` (policy `DisconnectSubscriber`). Retain `FailFast``MarkFaulted` only when the session is in legacy single-subscriber mode (back-compat).
**Steps:** Failing test: a slow subscriber overflows and is disconnected while a second subscriber keeps receiving and the session stays `Ready`. Implement. Green. Build+test. Commit.
### Task 6: Dashboard broadcaster becomes a distributor subscriber
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:131-141` (remove the inline `dashboardEventBroadcaster.Publish` tap)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (register the dashboard broadcaster as a distributor subscriber on session start)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardEventBroadcaster.cs` (consume from a distributor lease)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardEventBroadcasterTests.cs`
**Steps:** Failing test: dashboard receives session events even with **no** active gRPC subscriber (fixes the latent dark-feed bug). Implement. Green. Build + dashboard tests. Commit.
---
## Phase 2 — Multi-subscriber fan-out
### Task 7: Remove the validator block + add the subscriber cap option
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** Task 8 is sequential (same files); none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs:181-185` (delete the rejection)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs` (add `MaxEventSubscribersPerSession`, default 8)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Configuration/GatewayOptionsValidatorTests.cs`
**Steps:** Failing test: `AllowMultipleEventSubscribers=true` now validates clean. Remove rule, add option. Green. Build+test. Commit.
### Task 8: Subscriber-lease collection + cap enforcement
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (replace `_activeEventSubscriberCount:16` with a lease collection; honor `allowMultipleSubscribers`; reject N+1 with new `SessionManagerErrorCode.EventSubscriberLimitReached`)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManagerErrorCode.cs`
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs`
**Steps:** Failing tests: N subscribers attach concurrently up to the cap; N+1 throws `EventSubscriberLimitReached`; single-subscriber mode still rejects the 2nd. Implement. Green. Build+test. Commit.
### Task 9: Multi-subscriber end-to-end test via FakeWorkerHarness
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs`
**Steps:** Two concurrent `StreamEvents` RPCs on one session both receive every worker event; one cancels, the other continues. Build + full fake-worker suite. Commit.
---
## Phase 3 — Reconnectable sessions
### Task 10: Proto — `ReplayGap` signal (contract change)
**Classification:** high-risk (contracts → all clients)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Contracts/Protos/mxaccess_gateway.proto` (add a `ReplayGap` marker — a `replay_gap` bool + `oldest_available_sequence` on the stream response, or a dedicated leading status frame)
- Regenerate: `dotnet build src/ZB.MOM.WW.MxGateway.Contracts/ZB.MOM.WW.MxGateway.Contracts.csproj`; **commit** `src/ZB.MOM.WW.MxGateway.Contracts/Generated/*` (net48 regen rule — see `project_proto_codegen_regen`)
- Test: contracts build both TFMs (net10.0;net48)
**Steps:** Add field(s), regen, `del Generated/*.cs` if needed to force regen, commit generated. Build contracts both TFMs. Commit. **This unblocks Task 11 and Task 14.**
### Task 11: Detach-grace session retention
**Classification:** high-risk (session lifecycle)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (add `DetachGrace` retention: on last-subscriber-drop, keep session alive for `DetachGraceSeconds` instead of closing)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/SessionOptions.cs` (`DetachGraceSeconds`; new disconnect-policy value `DetachGrace`)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionLeaseMonitorHostedService.cs` (sweep expired detach-grace windows)
- Test: `GatewaySessionTests.cs`
**Steps:** Failing test: subscriber drop under `DetachGrace` keeps the session `Ready` until the window expires, then closes. Implement. Green. Build + session/lease tests. Commit.
### Task 12: Replay-on-reconnect + emit `ReplayGap`
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (on attach with `AfterWorkerSequence`, call `distributor.TryGetReplayFrom`; replay buffered events then resume live; if `gap`, send the `ReplayGap` marker first)
- Test: `EventStreamServiceTests.cs`
**Steps:** Failing tests: reconnect with a known sequence replays only newer events; reconnect past the ring horizon yields `ReplayGap`. Implement. Green. Build + test. Commit.
### Task 13: Owner re-validation on reconnect
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** Task 12 (different assertion in same service — sequence after 12)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs` (reconnect requires caller `OwnerKeyId` == session owner → `PermissionDenied`)
- Test: `EventStreamServiceTests.cs`
**Steps:** Failing test: a different API key cannot resume someone else's session. Implement. Green. Build+test. Commit.
### Task 14: Client `ReplayGap` handling — all 5 clients
**Classification:** standard
**Estimated implement time:** ~5 min each (dispatch as 5 parallel sub-tasks; disjoint files)
**Parallelizable with:** each other (14a14e)
**Files (one client each):**
- 14a dotnet: `clients/dotnet/.../` stream consumer + test
- 14b Go: `clients/go/mxgateway/` + `go test`
- 14c Python: `clients/python/src/.../` + `pytest`
- 14d Rust: `clients/rust/crates/.../` + `cargo test`/clippy
- 14e Java: `clients/java/.../` + `gradle test` (macOS JDK 21; **revert generated `MxaccessGateway.java` churn** per `project_java_generated_churn`)
**Steps (each):** Regenerate client stubs from the updated proto; surface `ReplayGap` to the caller (callback/return marker) so apps know to re-snapshot; test the gap path. Build+test that client. Commit per client.
### Task 15: Reconnect integration test (fake worker)
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs`
**Steps:** Stream, drop, reconnect within grace with last sequence → no gap; reconnect after ring overflow → `ReplayGap`. Build + suite. Commit.
---
## Phase 4 — Per-session ACL
### Task 16: gRPC session-owner gate + all-sessions admin scope
**Classification:** high-risk (security)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Grpc/MxAccessGatewayService.cs` (`Invoke`/`StreamEvents`/`CloseSession` require caller key == session `OwnerKeyId`, or a key bearing a new all-sessions scope)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Security/Authorization/` (define the all-sessions scope; map it)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs`
**Steps:** Failing tests: foreign key gets `PermissionDenied` on another key's session; owner and all-sessions-scoped key succeed. Implement. Green. Build + gateway tests. Commit.
### Task 17: Session `Tag` + dashboard group→tag config
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** Task 16 (disjoint files)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs` (+`SessionManager` to set an optional `Tag` from the open request)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/` Dashboard options (`GroupToSessionTag` map)
- Test: config-binding test
**Steps:** Failing test: a session carries its tag; config map binds. Implement. Green. Build+test. Commit.
### Task 18: EventsHub per-session ACL + hub-token session-tag claim
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on 17)
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs:39-54` (replace `TODO(per-session-acl)`: Admin sees all; Viewer allowed only if the session's tag is in the user's `GroupToSessionTag`-derived allowed set)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs` (mint an allowed-session-tag claim)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenAuthenticationHandler.cs` (carry the claim back)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/EventsHubTests.cs`
> **Decision flagged in the design doc:** default is "Admin all / Viewer by tag map." If the owner chose the strict variant (Viewers see nothing unless granted), invert the default here — the executor must confirm which before implementing.
**Steps:** Failing tests: Viewer without the tag is refused `SubscribeSession`; Admin allowed; Viewer with the mapped tag allowed. Implement. Green. Build + dashboard tests. Commit.
### Task 19: ACL tests incl. live LDAP users
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Test: `src/ZB.MOM.WW.MxGateway.IntegrationTests/DashboardLdapLiveTests.cs` (extend; gated `MXGATEWAY_RUN_LIVE_LDAP_TESTS=1`)
**Steps:** With `multi-role` (Admin) vs `gw-viewer` (Viewer), assert subscribe authorization differs by session tag. Document if skipped (no live LDAP). Commit.
---
## Phase 5 — Orphan-worker reattach (overturns the CLAUDE.md rule)
### Task 20: Stable gateway-instance id + stable pipe naming
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs:433` (pipe name uses a persisted stable gateway-instance id instead of `Environment.ProcessId`)
- Create: gateway-instance-id persistence (small file/SQLite row under `C:\ProgramData\MxGateway\`)
- Test: `SessionManagerTests.cs` / a new instance-id test
**Steps:** Failing test: pipe name is stable across simulated restarts (same instance id). Implement. Green. Build + tests. Commit.
### Task 21: Adoption manifest store (SQLite)
**Classification:** high-risk (persistence, security material)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Create: `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerAdoptionManifest.cs` (persist `sessionId → workerPid, nonce, ownerKeyId, pipeName`; upsert on launch, delete on clean close)
- Modify: gateway-auth SQLite schema/migration
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerAdoptionManifestTests.cs`
> Nonce is security material — store it like other secrets (no plaintext logging; standing rule).
**Steps:** Failing test: manifest round-trips an entry; clean close removes it. Implement. Green. Build + tests. Commit.
### Task 22: Proto — worker adopt/reconnect frame (contract change)
**Classification:** high-risk (contracts → worker)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Contracts/Protos/mxaccess_worker.proto` (add an adopt/reconnect `WorkerEnvelope` frame: worker presents `sessionId` + `nonce`; gateway ACK/NACK)
- Regenerate + **commit** `Generated/*` (net48 rule)
- Test: contracts build both TFMs
**Steps:** Add frame, regen, commit generated, build both TFMs. Commit. **Unblocks Tasks 2425.**
### Task 23: Worker phone-home reconnect loop + self-terminate
**Classification:** high-risk (worker, net48/x86 — **windev**)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Worker/Ipc/WorkerPipeClient.cs` (on pipe drop: reconnect loop with bounded backoff to the stable pipe name; present the adopt frame)
- Modify: `src/ZB.MOM.WW.MxGateway.Worker/` runtime (self-terminate after `MaxOrphanLifetime` with no adoption)
- Test: `src/ZB.MOM.WW.MxGateway.Worker.Tests/` (net48/x86 on windev)
**Steps:** Failing test (fake pipe server): worker retries and adopts; gives up + self-terminates past the lifetime. Build x86 + worker tests on **windev**. Commit. *(net48: no init-only/positional records.)*
### Task 24: Gateway adoption — re-open pipes, nonce-validate, reject impostors
**Classification:** high-risk (security, lifecycle — **windev** for live)
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Create: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerAdopter.cs` (startup: read manifest, re-open pipe servers, accept adopt frames, validate nonce → adopt or reject)
- Modify: gateway startup hosted-service order (adopter runs **before** `OrphanWorkerTerminator`; terminator handles only un-adoptable/foreign workers)
- Test: `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/OrphanWorkerAdopterTests.cs`
**Steps:** Failing tests: matching nonce adopts and rebuilds the session; mismatched nonce is rejected and the worker terminated. Implement. Green. Build + tests. Commit.
### Task 25: Resync adopted worker + `ReplayGap` to subscribers
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerAdopter.cs` (after adoption, `GetSessionState`/`GetWorkerInfo` to resync; reattached subscribers get `ReplayGap` since the ring is gone)
- Test: `OrphanWorkerAdopterTests.cs`
**Steps:** Failing test: adopted session reports resynced state; a resuming subscriber receives `ReplayGap`. Implement. Green. Build + tests. Commit.
### Task 26: `EnableOrphanReattach` flag (default off) + terminator fallback
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** none
**Files:**
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Configuration/WorkerOptions.cs` (`EnableOrphanReattach`, default `false`)
- Modify: `src/ZB.MOM.WW.MxGateway.Server/Workers/OrphanWorkerTerminator.cs` (unchanged default behavior when reattach disabled)
- Test: `OrphanWorkerTerminatorTests.cs` / adopter test
**Steps:** Failing test: with the flag off, startup terminates (today's behavior); on, it adopts. Implement. Green. Build + tests. Commit.
### Task 27: Gateway-restart reattach round-trip (integration, **windev** + live worker)
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none
**Files:**
- Test: `src/ZB.MOM.WW.MxGateway.IntegrationTests/WorkerLiveMxAccessSmokeTests.cs` (gated `MXGATEWAY_RUN_LIVE_MXACCESS_TESTS=1`)
**Steps:** Open session → simulate gateway restart → adopter re-adopts the surviving worker → session usable → subscriber gets `ReplayGap` then live events. Run on **windev** with live MXAccess. Document if skipped.
### Task 28: Documented-rule reversals + stillpending refresh
**Classification:** trivial (doc-only)
**Estimated implement time:** ~3 min
**Parallelizable with:** none (final)
**Files:**
- Modify: `CLAUDE.md` (line ~77 — reattach now supported, opt-in/bounded)
- Modify: `docs/DesignDecisions.md` (`:63-73` reconnect, `:75-80` multi-subscriber, reattach rationale → mark superseded with this design's date/commit)
- Modify: `gateway.md` (post-v1 revisit items — reflect what shipped)
- Modify: `stillpending.md` (§2 items: mark fan-out/reconnect/ACL/reattach Resolved with commit refs)
- Modify: `docs/GatewayConfiguration.md` (new options: `MaxEventSubscribersPerSession`, `ReplayBufferCapacity`, `ReplayRetentionSeconds`, `DetachGraceSeconds`, `GroupToSessionTag`, `EnableOrphanReattach`, `MaxOrphanLifetime`)
**Steps:** Edit docs to match shipped behavior. Commit.
---
## Verification matrix
| Phase | Build/test | Host |
|---|---|---|
| 14 (gateway, clients) | `dotnet build` + gateway/fake-worker tests; per-client `go/pytest/cargo/gradle/dotnet test` | local (macOS) |
| 3/5 proto changes | regen + commit `Generated/`; build contracts both TFMs; rebuild touched clients | local |
| 5 worker (net48/x86) | `dotnet build -p:Platform=x86` + `Worker.Tests` | **windev** |
| 5 live reattach + Phase-4 LDAP | opt-in gated integration tests | **windev** / live LDAP |
## Final integration review
After all tasks: dispatch a final integration reviewer over `git diff main..HEAD` focusing on the live event path, concurrency in `SessionEventDistributor`, security gates (ACL + nonce adoption), and the three documented-rule reversals. Then use superpowers-extended-cc:finishing-a-development-branch.
@@ -0,0 +1,34 @@
{
"planPath": "docs/plans/2026-06-15-session-resilience.md",
"tasks": [
{"id": 108, "subject": "Task 1: Add OwnerKeyId to the session", "status": "pending"},
{"id": 109, "subject": "Task 2: SessionEventDistributor skeleton", "status": "pending", "blockedBy": [108]},
{"id": 110, "subject": "Task 3: Bounded replay ring buffer", "status": "pending", "blockedBy": [109]},
{"id": 111, "subject": "Task 4: Rewire AttachEventSubscriber + EventStreamService onto distributor", "status": "pending", "blockedBy": [110]},
{"id": 112, "subject": "Task 5: Per-subscriber backpressure isolation", "status": "pending", "blockedBy": [111]},
{"id": 113, "subject": "Task 6: Dashboard broadcaster becomes a distributor subscriber", "status": "pending", "blockedBy": [111]},
{"id": 114, "subject": "Task 7: Remove validator block + add subscriber cap option", "status": "pending", "blockedBy": [112]},
{"id": 115, "subject": "Task 8: Subscriber-lease collection + cap enforcement", "status": "pending", "blockedBy": [114]},
{"id": 116, "subject": "Task 9: Multi-subscriber end-to-end test (FakeWorkerHarness)", "status": "pending", "blockedBy": [115]},
{"id": 117, "subject": "Task 10: Proto - ReplayGap signal", "status": "pending", "blockedBy": [116]},
{"id": 118, "subject": "Task 11: Detach-grace session retention", "status": "pending", "blockedBy": [117]},
{"id": 119, "subject": "Task 12: Replay-on-reconnect + emit ReplayGap", "status": "pending", "blockedBy": [118, 110]},
{"id": 120, "subject": "Task 13: Owner re-validation on reconnect", "status": "pending", "blockedBy": [119, 108]},
{"id": 121, "subject": "Task 14: Client ReplayGap handling - all 5 clients", "status": "pending", "blockedBy": [117]},
{"id": 122, "subject": "Task 15: Reconnect integration test (fake worker)", "status": "pending", "blockedBy": [119]},
{"id": 123, "subject": "Task 16: gRPC session-owner gate + all-sessions admin scope", "status": "pending", "blockedBy": [116, 108]},
{"id": 124, "subject": "Task 17: Session Tag + dashboard group-to-tag config", "status": "pending", "blockedBy": [116]},
{"id": 125, "subject": "Task 18: EventsHub per-session ACL + hub-token tag claim", "status": "pending", "blockedBy": [124]},
{"id": 126, "subject": "Task 19: ACL tests incl. live LDAP users", "status": "pending", "blockedBy": [125]},
{"id": 127, "subject": "Task 20: Stable gateway-instance id + stable pipe naming", "status": "pending", "blockedBy": [126]},
{"id": 128, "subject": "Task 21: Adoption manifest store (SQLite)", "status": "pending", "blockedBy": [127]},
{"id": 129, "subject": "Task 22: Proto - worker adopt/reconnect frame", "status": "pending", "blockedBy": [128]},
{"id": 130, "subject": "Task 23: Worker phone-home reconnect loop + self-terminate", "status": "pending", "blockedBy": [129]},
{"id": 131, "subject": "Task 24: Gateway adoption - re-open pipes, nonce-validate, reject impostors", "status": "pending", "blockedBy": [130]},
{"id": 132, "subject": "Task 25: Resync adopted worker + ReplayGap to subscribers", "status": "pending", "blockedBy": [131, 119]},
{"id": 133, "subject": "Task 26: EnableOrphanReattach flag (default off) + terminator fallback", "status": "pending", "blockedBy": [131]},
{"id": 134, "subject": "Task 27: Gateway-restart reattach round-trip (WINDEV + live worker)", "status": "pending", "blockedBy": [132, 133]},
{"id": 135, "subject": "Task 28: Documented-rule reversals + stillpending refresh", "status": "pending", "blockedBy": [134]}
],
"lastUpdated": "2026-06-15"
}
@@ -203,6 +203,7 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
GatewaySession session = await _sessionManager.OpenSessionAsync(
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
MonitorClientName,
ownerKeyId: null,
stoppingToken)
.ConfigureAwait(false);
lock (_sync) { _session = session; }
@@ -2,4 +2,6 @@ namespace ZB.MOM.WW.MxGateway.Server.Configuration;
public sealed record EffectiveEventConfiguration(
int QueueCapacity,
string BackpressurePolicy);
string BackpressurePolicy,
int ReplayBufferCapacity,
double ReplayRetentionSeconds);
@@ -6,4 +6,5 @@ public sealed record EffectiveSessionConfiguration(
int MaxPendingCommandsPerSession,
int DefaultLeaseSeconds,
int LeaseSweepIntervalSeconds,
bool AllowMultipleEventSubscribers);
bool AllowMultipleEventSubscribers,
int MaxEventSubscribersPerSession);
@@ -11,4 +11,20 @@ public sealed class EventOptions
/// Gets the backpressure policy for event queue overflow.
/// </summary>
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
/// <summary>
/// Gets the maximum number of events retained in the per-session replay ring buffer
/// used to re-deliver events a returning subscriber missed (reconnect/reattach).
/// When the buffer exceeds this count the oldest retained events are evicted first.
/// A value of <c>0</c> disables replay retention entirely.
/// </summary>
public int ReplayBufferCapacity { get; init; } = 1024;
/// <summary>
/// Gets the maximum age, in seconds, of an event retained in the per-session replay
/// ring buffer. Entries older than this are evicted regardless of capacity. A value
/// of <c>0</c> disables age-based eviction (only <see cref="ReplayBufferCapacity"/>
/// bounds the buffer).
/// </summary>
public double ReplayRetentionSeconds { get; init; } = 300;
}
@@ -46,10 +46,13 @@ public sealed class GatewayConfigurationProvider(IOptions<GatewayOptions> option
MaxPendingCommandsPerSession: value.Sessions.MaxPendingCommandsPerSession,
DefaultLeaseSeconds: value.Sessions.DefaultLeaseSeconds,
LeaseSweepIntervalSeconds: value.Sessions.LeaseSweepIntervalSeconds,
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers),
AllowMultipleEventSubscribers: value.Sessions.AllowMultipleEventSubscribers,
MaxEventSubscribersPerSession: value.Sessions.MaxEventSubscribersPerSession),
Events: new EffectiveEventConfiguration(
QueueCapacity: value.Events.QueueCapacity,
BackpressurePolicy: value.Events.BackpressurePolicy.ToString()),
BackpressurePolicy: value.Events.BackpressurePolicy.ToString(),
ReplayBufferCapacity: value.Events.ReplayBufferCapacity,
ReplayRetentionSeconds: value.Events.ReplayRetentionSeconds),
Dashboard: new EffectiveDashboardConfiguration(
Enabled: value.Dashboard.Enabled,
AllowAnonymousLocalhost: value.Dashboard.AllowAnonymousLocalhost,
@@ -177,12 +177,10 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
options.LeaseSweepIntervalSeconds,
"MxGateway:Sessions:LeaseSweepIntervalSeconds must be greater than zero.",
builder);
if (options.AllowMultipleEventSubscribers)
{
builder.Add(
"MxGateway:Sessions:AllowMultipleEventSubscribers is not supported until event fan-out is implemented.");
}
AddIfNotPositive(
options.MaxEventSubscribersPerSession,
"MxGateway:Sessions:MaxEventSubscribersPerSession must be greater than zero.",
builder);
}
private static void ValidateEvents(EventOptions options, ValidationBuilder builder)
@@ -193,6 +191,16 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
{
builder.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
}
// ReplayBufferCapacity and ReplayRetentionSeconds are bounds on the replay ring
// buffer; 0 is a valid value (disables that dimension), so only negatives fail.
AddIfNegative(
options.ReplayBufferCapacity,
"MxGateway:Events:ReplayBufferCapacity must be greater than or equal to zero.",
builder);
builder.RequireThat(
options.ReplayRetentionSeconds >= 0,
"MxGateway:Events:ReplayRetentionSeconds must be greater than or equal to zero.");
}
private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder)
@@ -27,4 +27,11 @@ public sealed class SessionOptions
/// Gets a value indicating whether multiple event subscribers are allowed per session.
/// </summary>
public bool AllowMultipleEventSubscribers { get; init; }
/// <summary>
/// Gets the maximum number of concurrent event subscribers per session.
/// Applies when <see cref="AllowMultipleEventSubscribers"/> is <see langword="true"/>;
/// effectively 1 when it is <see langword="false"/>. Must be greater than zero.
/// </summary>
public int MaxEventSubscribersPerSession { get; init; } = 8;
}
@@ -138,6 +138,7 @@ public sealed class DashboardLiveDataService : IDashboardLiveDataService, IAsync
GatewaySession session = await _sessionManager.OpenSessionAsync(
new SessionOpenRequest(BackendName, ClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
ClientName,
ownerKeyId: null,
cancellationToken)
.ConfigureAwait(false);
@@ -24,9 +24,21 @@ public sealed class DashboardEventBroadcaster(
return;
}
Task send = hubContext.Clients
.Group(EventsHub.GroupName(sessionId))
.SendAsync(EventsHub.EventMessage, mxEvent);
// Wrap the Task acquisition in a try/catch so a hypothetical synchronous throw
// from SendAsync (e.g. an implementation that throws before returning the Task)
// cannot escape Publish. The interface contract is never-throw; fire-and-forget.
Task send;
try
{
send = hubContext.Clients
.Group(EventsHub.GroupName(sessionId))
.SendAsync(EventsHub.EventMessage, mxEvent);
}
catch (Exception ex)
{
logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} threw synchronously.", sessionId);
return;
}
if (!send.IsCompletedSuccessfully)
{
@@ -1,9 +1,7 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
@@ -13,14 +11,46 @@ namespace ZB.MOM.WW.MxGateway.Server.Grpc;
public sealed class EventStreamService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
IDashboardEventBroadcaster dashboardEventBroadcaster,
ILogger<EventStreamService> logger) : IEventStreamService
GatewayMetrics metrics) : IEventStreamService
{
/// <summary>
/// Streams events from a session to the client asynchronously.
/// Streams events from a session to the client asynchronously.
/// </summary>
/// <remarks>
/// <para>
/// Task 4 rewired this from a per-RPC channel that drained the session directly
/// to reading the subscriber's lease channel fed by the session's single
/// <see cref="SessionEventDistributor"/> pump. The pump owns the single drain of
/// the worker event stream and the worker→public mapping (mirroring the former
/// <c>ProduceEventsAsync</c>); this loop is the per-subscriber boundary that
/// applies the per-RPC filter (<c>AfterWorkerSequence</c>), queue-depth metrics,
/// and the backpressure/overflow policy.
/// </para>
/// <para>
/// Task 6 moved the dashboard mirror OFF this per-RPC loop. The dashboard is now a
/// first-class internal subscriber on the session's
/// <see cref="SessionEventDistributor"/> (see <c>GatewaySession.StartDashboardMirror</c>),
/// so it receives session events even when no gRPC client is streaming. This loop no
/// longer mirrors to the dashboard. One deliberate consequence: the dashboard now sees
/// RAW session events, not the per-gRPC-subscriber <c>AfterWorkerSequence</c>-filtered
/// view this loop applies — the dashboard is a separate LDAP-authenticated monitoring
/// view that should see the session's full event activity (per-session dashboard ACL is
/// the separate Task 18).
/// </para>
/// <para>
/// Overflow handling (Task 5): the distributor's per-subscriber channel is bounded
/// and the pump writes non-blocking. When this subscriber's channel is full the pump
/// applies the per-subscriber backpressure policy and completes this subscriber's
/// channel with a <see cref="SessionManagerException"/>
/// (<see cref="SessionManagerErrorCode.EventQueueOverflow"/>). That terminal fault
/// surfaces here when the reader's <c>MoveNextAsync</c> throws, and — like the
/// pre-epic per-RPC overflow — it propagates to the gRPC client unchanged. The
/// overflow metric, and (in the legacy single-subscriber FailFast case) the session
/// fault + fault metric, are recorded by the distributor's overflow handler so the
/// session, the pump, and other subscribers are isolated from this subscriber's
/// slowness.
/// </para>
/// </remarks>
/// <param name="request">Stream events request.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Async enumerable of MX events.</returns>
@@ -35,151 +65,80 @@ public sealed class EventStreamService(
$"Session {request.SessionId} was not found.");
}
using IDisposable subscriber = session.AttachEventSubscriber(
// No `using` here — subscriber.Dispose() is called exactly once in the finally
// block below, which also disposes the reader. A `using` declaration would add a
// second Dispose on the same path and double-decrement the session subscriber count.
IEventSubscriberLease subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
int streamQueueDepth = 0;
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Task producerTask = ProduceEventsAsync(
session,
request.AfterWorkerSequence,
eventQueue.Writer,
() =>
{
Interlocked.Increment(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(1);
},
streamCts.Token);
ulong afterWorkerSequence = request.AfterWorkerSequence;
IAsyncEnumerator<MxEvent> reader = subscriber.Reader
.ReadAllAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken);
try
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
Interlocked.Decrement(ref streamQueueDepth);
metrics.AdjustGrpcEventStreamQueueDepth(-1);
MxEvent mxEvent;
try
{
if (!await reader.MoveNextAsync().ConfigureAwait(false))
{
break;
}
mxEvent = reader.Current;
}
catch (WorkerClientException workerException)
{
// The distributor pump completes every subscriber channel with the source
// fault when the worker event stream terminates abnormally; that surfaces
// here. Mirror the pre-Task-4 ProduceEventsAsync behavior: fault the
// session and record the metric, then propagate the terminal fault to the
// gRPC client.
session.MarkFaulted(workerException.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
throw;
}
// Per-RPC filter stays at the subscriber boundary: each request may resume
// from a different AfterWorkerSequence, so the shared pump fans raw events and
// this loop drops the ones at or below the caller's watermark.
if (mxEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
// Queue-depth gauge tracks events the pump has fanned into this subscriber's
// channel but the client has not yet consumed — the same "buffered, not yet
// delivered" quantity the pre-Task-4 per-RPC channel reported. The bounded
// subscriber channel supports counting, so reconcile the gauge to the current
// backlog; falling back to a no-op delta if a channel ever cannot count.
int backlog = subscriber.Reader.CanCount ? subscriber.Reader.Count : streamQueueDepth;
int delta = backlog - streamQueueDepth;
if (delta != 0)
{
streamQueueDepth = backlog;
metrics.AdjustGrpcEventStreamQueueDepth(delta);
}
yield return mxEvent;
}
await producerTask.ConfigureAwait(false);
}
finally
{
await streamCts.CancelAsync().ConfigureAwait(false);
await reader.DisposeAsync().ConfigureAwait(false);
subscriber.Dispose();
try
if (streamQueueDepth != 0)
{
await producerTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
{
}
catch (Exception exception)
{
logger.LogDebug(
exception,
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
int remainingDepth = Interlocked.Exchange(ref streamQueueDepth, 0);
if (remainingDepth > 0)
{
metrics.AdjustGrpcEventStreamQueueDepth(-remainingDepth);
metrics.AdjustGrpcEventStreamQueueDepth(-streamQueueDepth);
streamQueueDepth = 0;
}
metrics.StreamDisconnected("Detached");
}
}
private async Task ProduceEventsAsync(
GatewaySession session,
ulong afterWorkerSequence,
ChannelWriter<MxEvent> writer,
Action eventQueued,
CancellationToken cancellationToken)
{
try
{
await foreach (WorkerEvent workerEvent in session
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
// Mirror the event to the dashboard EventsHub group for this
// session. Fire-and-forget — broadcast errors must not affect
// the source gRPC stream. Server-041: the
// IDashboardEventBroadcaster contract documents Publish as
// never-throw, but we enforce that at the seam too, so a
// future implementation that adds synchronous validation or
// a serializer hop cannot fault the producer loop and end
// this client's gRPC stream.
try
{
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
}
catch (Exception ex)
{
logger.LogDebug(
ex,
"Dashboard event mirror threw for session {SessionId}; continuing.",
session.SessionId);
}
if (!writer.TryWrite(publicEvent))
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
metrics.QueueOverflow("grpc-event-stream");
if (options.Value.Events.BackpressurePolicy == EventBackpressurePolicy.FailFast)
{
session.MarkFaulted(message);
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
}
else
{
logger.LogDebug(
"Disconnecting event stream for session {SessionId} after queue overflow.",
session.SessionId);
}
writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
message));
return;
}
eventQueued();
}
writer.TryComplete();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
writer.TryComplete();
}
catch (Exception exception)
{
if (exception is WorkerClientException)
{
session.MarkFaulted(exception.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
}
writer.TryComplete(exception);
}
}
}
@@ -36,6 +36,7 @@ public sealed class MxAccessGatewayService(
.OpenSessionAsync(
SessionOpenRequest.FromContract(request),
ResolveClientIdentity(),
identityAccessor.Current?.KeyId,
context.CancellationToken)
.ConfigureAwait(false);
@@ -1,4 +1,10 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
@@ -7,6 +13,7 @@ public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private readonly SessionEventStreaming _eventStreaming;
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
@@ -14,6 +21,12 @@ public sealed class GatewaySession
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
private SessionEventDistributor? _eventDistributor;
private bool _eventDistributorStarted;
private bool _dashboardMirrorStarted;
private IEventSubscriberLease? _dashboardMirrorLease;
private Task? _dashboardMirrorTask;
private CancellationTokenSource? _dashboardMirrorCts;
private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = [];
/// <summary>
@@ -30,6 +43,11 @@ public sealed class GatewaySession
/// <param name="startupTimeout">Timeout for worker process startup.</param>
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
/// <param name="openedAt">Timestamp when the session opened.</param>
/// <remarks>
/// Constructs a session with no owner key (<see cref="OwnerKeyId"/> will be null).
/// Authenticated call sites that have a resolved API key identity must use the
/// 12-parameter overload and pass the caller's key id explicitly.
/// </remarks>
public GatewaySession(
string sessionId,
string backendName,
@@ -48,6 +66,7 @@ public sealed class GatewaySession
pipeName,
nonce,
clientIdentity,
ownerKeyId: null,
clientSessionName,
clientCorrelationId,
commandTimeout,
@@ -66,6 +85,7 @@ public sealed class GatewaySession
/// <param name="pipeName">Name of the named pipe for gateway-worker IPC.</param>
/// <param name="nonce">Security nonce for worker validation.</param>
/// <param name="clientIdentity">Client identity from the authentication context.</param>
/// <param name="ownerKeyId">API key identifier of the caller that created this session.</param>
/// <param name="clientSessionName">Client-supplied session name.</param>
/// <param name="clientCorrelationId">Client-supplied correlation identifier.</param>
/// <param name="commandTimeout">Timeout for command invocation.</param>
@@ -73,19 +93,30 @@ public sealed class GatewaySession
/// <param name="shutdownTimeout">Timeout for worker process shutdown.</param>
/// <param name="leaseDuration">Duration of the session lease.</param>
/// <param name="openedAt">Timestamp when the session opened.</param>
/// <param name="eventStreaming">
/// Dependencies the session uses to construct and own its
/// <see cref="SessionEventDistributor"/> (the single per-session worker-event pump
/// that fans raw mapped <see cref="MxEvent"/>s to every subscriber lease). When
/// <see langword="null"/>, defaults are used (no replay logger, system clock, a
/// fresh mapper, and default <see cref="EventOptions"/>) so unit tests that build a
/// session directly still get a working distributor. Production passes the
/// DI-resolved dependencies.
/// </param>
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? ownerKeyId,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
TimeSpan leaseDuration,
DateTimeOffset openedAt)
DateTimeOffset openedAt,
SessionEventStreaming? eventStreaming = null)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
@@ -112,6 +143,7 @@ public sealed class GatewaySession
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
OwnerKeyId = ownerKeyId;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
@@ -121,6 +153,7 @@ public sealed class GatewaySession
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
_leaseExpiresAt = openedAt + leaseDuration;
_eventStreaming = eventStreaming ?? SessionEventStreaming.Default;
}
/// <summary>
@@ -148,6 +181,11 @@ public sealed class GatewaySession
/// </summary>
public string? ClientIdentity { get; }
/// <summary>
/// Gets the API key identifier of the caller that created this session.
/// </summary>
public string? OwnerKeyId { get; }
/// <summary>
/// Gets the client-supplied session name.
/// </summary>
@@ -318,9 +356,268 @@ public sealed class GatewaySession
/// <summary>
/// Transitions the session to the Ready state.
/// </summary>
/// <remarks>
/// On becoming Ready the session starts its internal dashboard mirror (Task 6) when a
/// dashboard broadcaster was supplied. The mirror registers an internal subscriber on
/// the distributor and starts the pump <em>before</em> any gRPC client attaches, so the
/// dashboard EventsHub receives session events even with no gRPC subscriber streaming —
/// fixing the "dark feed" where the dashboard only saw events while a gRPC client was
/// actively streaming. Registering the internal subscriber BEFORE
/// <see cref="SessionEventDistributor.StartAsync"/> also avoids the Task 4 hazard where
/// starting the pump at Ready with zero subscribers drained a fast-completing worker
/// stream into nothing and left a later subscriber hanging: there is now always a
/// subscriber (the dashboard one) registered before the pump starts.
/// </remarks>
public void MarkReady()
{
TransitionTo(SessionState.Ready);
StartDashboardMirror();
}
// Constructs and starts the distributor exactly once, registering the subscriber under
// the same start so no event the pump fans can be missed between start and register.
// Started lazily on the FIRST AttachEventSubscriber rather than at MarkReady: today the
// worker event stream is only drained when a client begins streaming, so deferring the
// single drain to first-attach preserves that "events start flowing on subscribe"
// behavior and avoids draining a fast-completing source into the void before any
// subscriber exists. The source factory mirrors the mapping/ordering/start that
// EventStreamService.ProduceEventsAsync used before Task 4: it drains the worker event
// stream in source order and maps each WorkerEvent to the public MxEvent with the same
// mapper, with no skip/filter — per-RPC filtering (e.g. AfterWorkerSequence) stays at the
// subscriber boundary in EventStreamService. Returns a registered lease atomically with
// the start so the very first subscriber sees the stream from its beginning.
private IEventSubscriberLease StartDistributorAndRegister()
{
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
// Register BEFORE starting the pump so a subscriber is present when the pump begins
// draining — no event is fanned to an empty subscriber set and then missed by this
// first subscriber. StartAsync only schedules the pump task; it never blocks.
IEventSubscriberLease lease = distributor.Register();
StartPumpIfRequested(distributor, startNow);
return lease;
}
// Constructs the distributor exactly once and reports whether THIS caller is the one
// that should start the pump (i.e. it observed the unstarted state and claimed the
// start). Both the construction and the started-flag flip happen under _syncRoot so two
// concurrent callers (e.g. MarkReady's dashboard mirror and a racing first
// AttachEventSubscriber) agree on a single distributor and a single start.
private SessionEventDistributor EnsureDistributorCreated(out bool startNow)
{
lock (_syncRoot)
{
if (_eventDistributor is null)
{
EventOptions eventOptions = _eventStreaming.EventOptions;
_eventDistributor = new SessionEventDistributor(
SessionId,
MapWorkerEventsAsync,
eventOptions.QueueCapacity,
eventOptions.ReplayBufferCapacity,
eventOptions.ReplayRetentionSeconds,
_eventStreaming.DistributorLogger,
_eventStreaming.TimeProvider,
CreateOverflowHandler(eventOptions.BackpressurePolicy));
}
startNow = false;
if (!_eventDistributorStarted)
{
_eventDistributorStarted = true;
startNow = true;
}
return _eventDistributor;
}
}
private static void StartPumpIfRequested(SessionEventDistributor distributor, bool startNow)
{
if (!startNow)
{
return;
}
// StartAsync only schedules the pump via Task.Run and returns a completed task;
// it does not perform any async I/O itself. The sync-over-async call here is
// therefore safe and will not deadlock. Do not make StartAsync truly async
// (i.e., await real I/O before returning) without also changing this call site.
distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult();
}
// Registers the gateway-owned internal dashboard subscriber on the distributor and starts
// a background loop that mirrors every fanned event to the dashboard broadcaster. Called
// once when the session becomes Ready (idempotent). The internal subscriber is registered
// BEFORE the pump starts (see StartDistributorAndRegister / EnsureDistributorCreated), so
// a subscriber is always present at pump start — the dashboard receives events with no
// gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang
// cannot occur. No-op when no dashboard broadcaster was supplied (unit tests).
//
// Race-safety (Issue 1): _dashboardMirrorLease and _dashboardMirrorTask are published
// atomically under a SINGLE second lock section, and DisposeAsync reads/nulls them under
// that same lock. After EnsureDistributorCreated/Register/StartPump (all outside _syncRoot
// to avoid lock inversion with the distributor's own lifecycle lock), we re-enter
// _syncRoot and check for concurrent disposal. If the session is already Closing/Closed/
// Faulted at that point, we dispose the just-created lease immediately and do NOT start
// the mirror task, so nothing is orphaned.
private void StartDashboardMirror()
{
IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster;
if (broadcaster is null)
{
return;
}
CancellationToken loopToken;
lock (_syncRoot)
{
if (_dashboardMirrorStarted || _state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
{
return;
}
_dashboardMirrorStarted = true;
_dashboardMirrorCts = new CancellationTokenSource();
loopToken = _dashboardMirrorCts.Token;
}
// Create the distributor (claiming the start if we are first) and register the
// internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard
// subscriber out of the single-subscriber overflow accounting, so a slow/broken
// dashboard mirror only disconnects itself and never faults the session.
// These three calls are OUTSIDE _syncRoot to avoid holding it across
// EnsureDistributorCreated's own lock and StartAsync's Task.Run.
SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow);
IEventSubscriberLease lease = distributor.Register(isInternal: true);
StartPumpIfRequested(distributor, startNow);
// Publish BOTH the lease and the task atomically under one lock section so
// DisposeAsync always sees them in a consistent state: either both are set or
// both are null. If the session already started disposal before we got here,
// dispose the lease immediately instead of orphaning it.
lock (_syncRoot)
{
if (_state is SessionState.Closing or SessionState.Closed or SessionState.Faulted)
{
// Disposal already ran (or is in progress) — discard the just-created
// lease now so it is not orphaned. Do NOT launch the mirror task.
lease.Dispose();
return;
}
_dashboardMirrorLease = lease;
_dashboardMirrorTask = Task.Run(
() => RunDashboardMirrorAsync(broadcaster, lease, loopToken),
CancellationToken.None);
}
}
// Reads the internal dashboard subscriber's channel and publishes each RAW fanned event
// to the dashboard broadcaster. The dashboard is a first-class distributor subscriber
// (Task 6), so it sees the session's full raw event activity — NOT the per-gRPC-subscriber
// AfterWorkerSequence filtering that EventStreamService applies at its own boundary. This
// is intentional: the dashboard is a separate LDAP-authenticated monitoring view (per-
// session dashboard ACL is the separate Task 18). Publish is best-effort / never-throw, so
// a slow or broken dashboard cannot fault the session or stall the pump; the bounded
// internal subscriber channel (Task 5 per-subscriber isolation) only disconnects THIS
// mirror on overflow, leaving the session and other subscribers untouched.
private async Task RunDashboardMirrorAsync(
IDashboardEventBroadcaster broadcaster,
IEventSubscriberLease lease,
CancellationToken cancellationToken)
{
try
{
await foreach (MxEvent mxEvent in lease.Reader
.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
try
{
broadcaster.Publish(SessionId, mxEvent);
}
catch (Exception exception)
{
// Publish is documented never-throw, but enforce it here too so a future
// implementation cannot fault the mirror loop. Logs identifiers only.
_eventStreaming.DistributorLogger.LogDebug(
exception,
"Dashboard event mirror threw for session {SessionId}; continuing.",
SessionId);
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Teardown path: the session is shutting down the mirror.
}
catch (SessionManagerException)
{
// The internal subscriber's channel overflowed and the distributor disconnected
// it with a terminal overflow fault. That disconnects only the dashboard mirror;
// the session, pump, and any gRPC subscriber are unaffected. Stop mirroring.
}
catch (Exception exception)
{
// Source-fault completion (worker event stream terminated abnormally) surfaces
// here. The session's own fault handling runs via the gRPC path / lifecycle; the
// mirror just stops. Logs identifiers only.
_eventStreaming.DistributorLogger.LogDebug(
exception,
"Dashboard event mirror loop ended for session {SessionId}.",
SessionId);
}
}
// Builds the per-subscriber backpressure handler the distributor invokes when a
// subscriber's bounded channel overflows. The distributor always disconnects the
// offending subscriber with an EventQueueOverflow fault; this handler adds the
// observable side effects, preserving exactly what the pre-epic per-RPC overflow path
// emitted:
// - always record the queue-overflow metric, labeled by subscriber kind;
// - FailFast in the legacy single-subscriber case (isOnlySubscriber): fault the whole
// session and record the fault metric, matching back-compat behavior;
// - FailFast with multiple subscribers, or DisconnectSubscriber in any case: do NOT
// fault the session — the distributor's disconnect of the one slow subscriber is the
// whole remedy, so other subscribers and the pump are unaffected. Multi-subscriber
// FailFast deliberately degrades to a disconnect because faulting a shared session on
// one slow consumer would punish healthy subscribers.
// The delegate now carries isInternal directly (Issue 4), so the metric label is chosen
// without any heuristic: "dashboard-mirror" for internal, "grpc-event-stream" for external.
private SubscriberOverflowHandler CreateOverflowHandler(EventBackpressurePolicy policy)
{
GatewayMetrics metrics = _eventStreaming.Metrics;
string sessionId = SessionId;
return (isOnlySubscriber, isInternal) =>
{
// Label the overflow metric by subscriber kind. The distributor passes isInternal
// directly, so no heuristic is needed to distinguish an internal overflow (the
// gateway-owned dashboard mirror) from an external one (a gRPC streaming client).
string label = isInternal ? "dashboard-mirror" : "grpc-event-stream";
metrics.QueueOverflow(label);
if (policy == EventBackpressurePolicy.FailFast && isOnlySubscriber)
{
MarkFaulted($"Session {sessionId} event stream queue overflowed.");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
}
};
}
// The distributor's single event source. Drains the worker event stream once (the
// distributor guarantees a single consumer) and maps each frame to the public MxEvent,
// preserving worker order. Mirrors the former ProduceEventsAsync mapping exactly.
private async IAsyncEnumerable<MxEvent> MapWorkerEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
MxAccessGrpcMapper mapper = _eventStreaming.Mapper;
await foreach (WorkerEvent workerEvent in ReadEventsAsync(cancellationToken)
.ConfigureAwait(false))
{
yield return mapper.MapEvent(workerEvent);
}
}
/// <summary>
@@ -381,10 +678,15 @@ public sealed class GatewaySession
}
/// <summary>
/// Attaches an event subscriber and returns a disposable lease.
/// Attaches an event subscriber and returns a lease whose
/// <see cref="IEventSubscriberLease.Reader"/> reads the fanned public
/// <see cref="MxEvent"/>s for this subscriber. The single-subscriber guard
/// (Tasks 7/8 relax it) is unchanged: with multi-subscriber disabled a second
/// attach is rejected. The returned lease, when disposed, unregisters the
/// distributor subscriber AND decrements the active-subscriber count.
/// </summary>
/// <param name="allowMultipleSubscribers">If true, allows multiple concurrent event subscribers.</param>
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
public IEventSubscriberLease AttachEventSubscriber(bool allowMultipleSubscribers)
{
lock (_syncRoot)
{
@@ -403,7 +705,20 @@ public sealed class GatewaySession
}
_activeEventSubscriberCount++;
return new EventSubscriberLease(this);
}
// Construct/start the distributor and register this subscriber. Done outside the
// guard lock (StartDistributorAndRegister takes _syncRoot itself for construction).
// On any failure roll back the count we just took so the guard stays consistent.
try
{
IEventSubscriberLease distributorLease = StartDistributorAndRegister();
return new EventSubscriberLease(this, distributorLease);
}
catch
{
DetachEventSubscriber();
throw;
}
}
@@ -960,6 +1275,63 @@ public sealed class GatewaySession
{
}
// Stop the internal dashboard mirror first: cancel its loop, dispose its lease (which
// unregisters its internal distributor subscriber and completes its channel), and
// await the loop task. Done BEFORE disposing the distributor and worker client — like
// the distributor itself — so the mirror is no longer reading the pump when the pump
// and its source (the worker client) tear down.
IEventSubscriberLease? dashboardLease;
Task? dashboardTask;
CancellationTokenSource? dashboardCts;
lock (_syncRoot)
{
dashboardLease = _dashboardMirrorLease;
dashboardTask = _dashboardMirrorTask;
dashboardCts = _dashboardMirrorCts;
_dashboardMirrorLease = null;
_dashboardMirrorTask = null;
_dashboardMirrorCts = null;
}
if (dashboardCts is not null)
{
await dashboardCts.CancelAsync().ConfigureAwait(false);
}
dashboardLease?.Dispose();
if (dashboardTask is not null)
{
try
{
await dashboardTask.ConfigureAwait(false);
}
catch (Exception)
{
// The mirror loop swallows its own faults; any escape here must not block
// disposal. The loop has stopped, which is all teardown requires.
}
}
dashboardCts?.Dispose();
// Stop the event pump and complete every subscriber channel before tearing down the
// worker client (the pump's source). DisposeAsync is the single session teardown
// point (SessionManager.RemoveSessionAsync awaits it after close), so awaiting it
// here guarantees the distributor's pump task is observed and subscribers are
// completed rather than left dangling.
SessionEventDistributor? distributor;
lock (_syncRoot)
{
distributor = _eventDistributor;
_eventDistributor = null;
}
if (distributor is not null)
{
await distributor.DisposeAsync().ConfigureAwait(false);
}
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
@@ -1101,22 +1473,30 @@ public sealed class GatewaySession
}
}
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
private sealed class EventSubscriberLease(GatewaySession session, IEventSubscriberLease distributorLease)
: IEventSubscriberLease
{
private bool _disposed;
// 0 = live, 1 = disposed. Interlocked so concurrent stream-completion +
// client-cancellation paths cannot both call DetachEventSubscriber and
// double-decrement _activeEventSubscriberCount to -1.
private int _leaseDisposed;
/// <inheritdoc />
public System.Threading.Channels.ChannelReader<MxEvent> Reader => distributorLease.Reader;
/// <summary>
/// Disposes the lease and detaches the event subscriber.
/// Disposes the lease: unregisters this subscriber from the distributor (completing
/// its channel) and decrements the session's active-subscriber count. Ordering is
/// not significant — the count guard and the distributor registration are
/// independent — but both must run exactly once.
/// </summary>
public void Dispose()
{
if (_disposed)
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
{
return;
distributorLease.Dispose();
session.DetachEventSubscriber();
}
session.DetachEventSubscriber();
_disposed = true;
}
}
}
@@ -0,0 +1,18 @@
using System.Threading.Channels;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// A registration lease into a <see cref="SessionEventDistributor"/>. Exposes the
/// subscriber's own <see cref="ChannelReader{T}"/> of fanned events. Disposing the
/// lease unregisters the subscriber and completes its channel without disturbing the
/// pump or other subscribers.
/// </summary>
public interface IEventSubscriberLease : IDisposable
{
/// <summary>
/// Gets the reader for this subscriber's fanned event channel.
/// </summary>
ChannelReader<MxEvent> Reader { get; }
}
@@ -8,11 +8,13 @@ public interface ISessionManager
/// <summary>Opens a new gateway session and launches a worker process.</summary>
/// <param name="request">Request payload.</param>
/// <param name="clientIdentity">Client identity string.</param>
/// <param name="ownerKeyId">API key identifier of the caller creating the session.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>The newly opened session.</returns>
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken);
/// <summary>Attempts to retrieve a session by ID.</summary>
@@ -0,0 +1,666 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// Invoked by the pump (on the pump thread) when a subscriber's bounded channel is full
/// and the event cannot be written. The handler applies policy side-effects only:
/// it records the overflow metric and, in the legacy single-subscriber FailFast case,
/// faults the owning session. The handler MUST NOT complete the subscriber's channel —
/// the distributor performs the disconnect and channel-completion unconditionally,
/// regardless of what the handler does.
/// </summary>
/// <param name="isOnlySubscriber">
/// <see langword="true"/> when the overflowing subscriber is the sole registered
/// subscriber at the moment of overflow (legacy single-subscriber mode). FailFast faults
/// the session only in this case; with multiple subscribers FailFast degrades to a
/// per-subscriber disconnect so one slow consumer never faults a session shared by others.
/// Always <see langword="false"/> for internal subscribers (the dashboard mirror) because
/// <see cref="SessionEventDistributor"/> excludes them from the external-subscriber count.
/// </param>
/// <param name="isInternal">
/// <see langword="true"/> when the overflowing subscriber is the gateway-owned internal
/// dashboard mirror subscriber. The handler uses this to choose the correct metric label
/// (<c>"dashboard-mirror"</c> vs <c>"grpc-event-stream"</c>).
/// </param>
public delegate void SubscriberOverflowHandler(bool isOnlySubscriber, bool isInternal);
/// <summary>
/// Per-session event pump and fan-out. A single background task drains the
/// session's event source <em>exactly once</em> and fans each event out to
/// every currently-registered subscriber's own bounded channel.
/// </summary>
/// <remarks>
/// <para>
/// Introduced by Task 2 of the Session Resilience epic; the bounded replay ring
/// buffer was added by Task 3, it was wired into <c>GatewaySession</c> and
/// <c>EventStreamService</c> by Task 4, and the per-subscriber backpressure-isolation
/// policy (Task 5) is implemented here: a slow subscriber overflows only its own
/// bounded channel and the pump applies the policy to that subscriber alone (see
/// <see cref="SubscriberOverflowHandler"/> and <c>OnSubscriberOverflow</c>), leaving
/// the pump, the session, and other subscribers running. The class does not yet
/// remove the single-subscriber guard (Tasks 7/8). The ring buffer supports capacity
/// eviction (oldest entry dropped when the count exceeds
/// <c>replayBufferCapacity</c>) and age eviction (entries older than
/// <c>replayRetentionSeconds</c> dropped on the next append or query), and is
/// queried via <see cref="TryGetReplayFrom"/> by reconnecting subscribers.
/// </para>
/// <para>
/// <b>Source seam.</b> The event source is injected as a
/// <see cref="Func{T, TResult}"/> producing an
/// <see cref="IAsyncEnumerable{T}"/> of already-mapped public
/// <see cref="MxEvent"/>s, given a <see cref="CancellationToken"/>. This is the
/// cleanest seam for Task 4: it can pass
/// <c>ct =&gt; session.ReadEventsAsync(ct).Select(mapper.MapEvent)</c> (or a
/// channel reader's <c>ReadAllAsync</c>), while unit tests pass a plain
/// channel reader's <c>ReadAllAsync</c> with no real session. The pump owns the
/// single consumption of this enumerable; fan-out happens on the public
/// <see cref="MxEvent"/> after mapping, mirroring today's
/// <c>EventStreamService.ProduceEventsAsync</c> ordering.
/// </para>
/// <para>
/// <b>Concurrency.</b> The subscriber set is a
/// <see cref="ConcurrentDictionary{TKey, TValue}"/> keyed by a monotonic id.
/// The pump iterates it with a snapshot-free enumerator (which never throws on
/// concurrent add/remove), and <see cref="Register"/> / lease disposal mutate it
/// without any lock held across an <c>await</c>. Each subscriber channel has a
/// single writer — the pump — so per-channel writes never race. MXAccess parity:
/// events are fanned in the order received; the pump never reorders or
/// synthesizes events.
/// </para>
/// </remarks>
public sealed class SessionEventDistributor : IAsyncDisposable
{
/// <summary>
/// Bounded wait for the pump to stop during disposal. A source factory that
/// ignores cancellation must not hang dispose forever; after this window the
/// pump is abandoned and subscribers are completed anyway.
/// </summary>
private static readonly TimeSpan DefaultShutdownTimeout = TimeSpan.FromSeconds(5);
private readonly string _sessionId;
private readonly Func<CancellationToken, IAsyncEnumerable<MxEvent>> _eventSourceFactory;
private readonly int _subscriberQueueCapacity;
private readonly SubscriberOverflowHandler? _overflowHandler;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger<SessionEventDistributor> _logger;
private readonly TimeProvider _timeProvider;
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new();
private readonly object _lifecycleLock = new();
// Replay ring buffer. Appended on the pump thread and queried from arbitrary
// threads via TryGetReplayFrom, so every access is under _replayLock. The deque
// keeps events in ascending WorkerSequence order (the pump fans in source order),
// so the oldest retained event is always at the front. Capacity == 0 disables
// retention; RetentionSeconds <= 0 disables age-based eviction.
private readonly int _replayBufferCapacity;
private readonly TimeSpan _replayRetention;
private readonly bool _ageEvictionEnabled;
private readonly LinkedList<ReplayEntry> _replayBuffer = new();
private readonly object _replayLock = new();
private bool _anyEventSeen;
private ulong _highestSequenceSeen;
private long _nextSubscriberId;
private Task? _pumpTask;
private bool _started;
private bool _disposed;
/// <summary>
/// Initializes a per-session event distributor.
/// </summary>
/// <param name="sessionId">Owning session id, used only for logging context.</param>
/// <param name="eventSourceFactory">
/// Factory producing the session's event stream given a cancellation token.
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
/// plugs into.
/// </param>
/// <param name="subscriberQueueCapacity">
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
/// queue capacity shape used today.
/// </param>
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
/// <remarks>
/// This overload disables the replay ring buffer (capacity 0). Use the overload
/// taking replay parameters to retain events for reconnect/reattach replay.
/// Kept <c>internal</c> so production wiring (Task 4) cannot accidentally use
/// the no-replay path; tests reach it via <c>InternalsVisibleTo</c>.
/// </remarks>
internal SessionEventDistributor(
string sessionId,
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
int subscriberQueueCapacity,
ILogger<SessionEventDistributor> logger,
SubscriberOverflowHandler? overflowHandler = null)
: this(
sessionId,
eventSourceFactory,
subscriberQueueCapacity,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
logger,
TimeProvider.System,
overflowHandler)
{
}
/// <summary>
/// Initializes a per-session event distributor with a bounded replay ring buffer.
/// </summary>
/// <param name="sessionId">Owning session id, used only for logging context.</param>
/// <param name="eventSourceFactory">
/// Factory producing the session's event stream given a cancellation token.
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
/// plugs into.
/// </param>
/// <param name="subscriberQueueCapacity">
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
/// queue capacity shape used today.
/// </param>
/// <param name="replayBufferCapacity">
/// Maximum number of events retained for replay. The oldest retained event is
/// evicted once this count is exceeded. <c>0</c> disables retention entirely.
/// </param>
/// <param name="replayRetentionSeconds">
/// Maximum age, in seconds, of a retained event. Entries older than this are
/// evicted regardless of capacity. <c>0</c> (or less) disables age-based eviction.
/// </param>
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
/// <param name="timeProvider">
/// Clock used to timestamp and age-evict replay entries. Inject a fake to make
/// age-eviction deterministic in tests.
/// </param>
/// <param name="overflowHandler">
/// Optional per-subscriber backpressure handler invoked when a subscriber's bounded
/// channel is full. It records the overflow metric and, for the legacy
/// single-subscriber FailFast case, faults the owning session. The distributor always
/// disconnects the offending subscriber with an overflow fault regardless of the
/// handler. When <see langword="null"/> (unit/skeleton use) the offending subscriber is
/// still disconnected but no metric/fault side effect runs.
/// </param>
public SessionEventDistributor(
string sessionId,
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
int subscriberQueueCapacity,
int replayBufferCapacity,
double replayRetentionSeconds,
ILogger<SessionEventDistributor> logger,
TimeProvider timeProvider,
SubscriberOverflowHandler? overflowHandler = null)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity);
ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds);
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(timeProvider);
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
_overflowHandler = overflowHandler;
_shutdownTimeout = DefaultShutdownTimeout;
_replayBufferCapacity = replayBufferCapacity;
_ageEvictionEnabled = replayRetentionSeconds > 0;
_replayRetention = _ageEvictionEnabled
? TimeSpan.FromSeconds(replayRetentionSeconds)
: TimeSpan.Zero;
_logger = logger;
_timeProvider = timeProvider;
}
/// <summary>
/// Gets the count of currently-registered subscribers.
/// </summary>
public int SubscriberCount => _subscribers.Count;
/// <summary>
/// Starts the background pump. Idempotent — a second call is a no-op.
/// </summary>
/// <param name="cancellationToken">Token observed only while starting.</param>
public Task StartAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_started)
{
return Task.CompletedTask;
}
_started = true;
_pumpTask = Task.Run(() => PumpAsync(_shutdownCts.Token), CancellationToken.None);
}
return Task.CompletedTask;
}
/// <summary>
/// Registers a new subscriber and returns its lease. The lease exposes the
/// subscriber's <see cref="ChannelReader{T}"/> and, when disposed, unregisters the
/// subscriber and completes its channel without disturbing the pump or other
/// subscribers.
/// </summary>
/// <param name="isInternal">
/// <see langword="true"/> for a gateway-owned internal subscriber (Task 6: the
/// session's dashboard mirror) that must NOT participate in the single-subscriber
/// overflow accounting. An internal subscriber is excluded from the
/// <c>isOnlySubscriber</c> count, so a lone external gRPC subscriber still reports
/// <c>isOnlySubscriber == true</c> (preserving legacy FailFast session-fault
/// behavior) even while the dashboard subscriber is attached; and an internal
/// subscriber that itself overflows always reports <c>isOnlySubscriber == false</c>,
/// so a slow/broken dashboard can never fault the session — it is merely
/// disconnected from the mirror. Defaults to <see langword="false"/> (external
/// subscriber) so every existing call site is unchanged.
/// </param>
public IEventSubscriberLease Register(bool isInternal = false)
{
// The pump is the single writer for this channel; readers are single-consumer
// (one gRPC stream / dashboard subscriber). Synchronous continuations are
// disabled so a slow reader can never stall the pump on its completion.
//
// The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one
// slow reader can never stall the single pump that feeds every subscriber. FullMode
// is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking
// WriteAsync overload), but because Wait is the only BoundedChannelFullMode under
// which TryWrite returns false when the channel is full. That false return IS the
// overflow signal the pump needs to apply the per-subscriber backpressure policy. The
// Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and
// re-introducing the silent data loss this task removes. So: Wait mode + TryWrite =
// a non-blocking pump that still detects a full subscriber channel.
Channel<MxEvent> channel = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(_subscriberQueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
long id = Interlocked.Increment(ref _nextSubscriberId);
Subscriber subscriber = new(id, channel, isInternal);
// The disposed check AND the map add happen under the same lock with no await
// in between. DisposeAsync sets _disposed=true under this same lock before it
// calls CompleteAllSubscribers, so once disposal has begun no further subscriber
// can be added — closing the Register-after-DisposeAsync window that would
// otherwise leave a subscriber's channel never completed.
lock (_lifecycleLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
_subscribers[id] = subscriber;
}
return new SubscriberLease(this, subscriber);
}
/// <summary>
/// Stops the pump and completes all subscriber channels. Idempotent.
/// </summary>
public async ValueTask DisposeAsync()
{
Task? pumpTask;
lock (_lifecycleLock)
{
if (_disposed)
{
return;
}
_disposed = true;
pumpTask = _pumpTask;
}
// Signal the pump to stop. It must not block on a non-reading subscriber:
// it writes with non-blocking TryWrite, so cancellation tears it down promptly.
await _shutdownCts.CancelAsync().ConfigureAwait(false);
if (pumpTask is not null)
{
// Bound the wait: a source factory that ignores cancellation would otherwise
// hang dispose forever. If the pump does not stop in time we log and proceed
// to complete subscribers anyway; DisposeAsync must not throw on this path.
Task completed = await Task.WhenAny(pumpTask, Task.Delay(_shutdownTimeout)).ConfigureAwait(false);
if (!ReferenceEquals(completed, pumpTask))
{
_logger.LogWarning(
"Event distributor pump did not stop within {ShutdownTimeoutSeconds}s for session {SessionId}; completing subscribers and abandoning the pump.",
_shutdownTimeout.TotalSeconds,
_sessionId);
}
else
{
try
{
await pumpTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"Event distributor pump faulted during shutdown for session {SessionId}.",
_sessionId);
}
}
}
CompleteAllSubscribers(error: null);
_shutdownCts.Dispose();
}
private async Task PumpAsync(CancellationToken cancellationToken)
{
try
{
await foreach (MxEvent mxEvent in _eventSourceFactory(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
// Retain for replay BEFORE fan-out so a reconnecting subscriber that
// queries between fan-out and its own read still sees this event. Order
// is preserved: the pump is the single appender and events arrive in
// source order.
AppendToReplayBuffer(mxEvent);
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
// add/remove; a subscriber registered mid-iteration may miss this event,
// which matches "late subscribers see events after they register".
foreach (Subscriber subscriber in _subscribers.Values)
{
// Non-blocking write: TryWrite never blocks the pump on a slow reader.
// A false return means this subscriber's bounded channel is full — the
// per-subscriber overflow signal. We apply the backpressure policy to
// THIS subscriber only; the pump, the session, and every other subscriber
// keep running. Logs identifiers (worker sequence, subscriber id, session)
// only, never the event payload or tag values.
if (!subscriber.Channel.Writer.TryWrite(mxEvent))
{
OnSubscriberOverflow(subscriber, mxEvent.WorkerSequence);
}
}
}
CompleteAllSubscribers(error: null);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown path: DisposeAsync completes subscribers.
}
catch (Exception exception)
{
// Unexpected source fault (not the shutdown-cancellation path above) — visible
// by default so an event stream silently dying is not lost in Debug noise.
_logger.LogError(
exception,
"Event distributor source faulted for session {SessionId}.",
_sessionId);
CompleteAllSubscribers(exception);
}
}
// Applies the per-subscriber backpressure policy when a subscriber's bounded channel is
// full. Runs on the pump thread. The offending subscriber is ALWAYS disconnected with an
// overflow fault and unregistered, so it can never wedge the pump again; the overflow
// handler decides the observable side effects (overflow metric, and — for legacy
// single-subscriber FailFast — faulting the owning session). Multi-subscriber FailFast
// intentionally degrades to a plain disconnect (see SubscriberOverflowHandler docs): one
// slow consumer must not fault a session shared by other healthy subscribers.
private void OnSubscriberOverflow(Subscriber subscriber, ulong workerSequence)
{
// Snapshot whether this is the sole subscriber BEFORE we unregister it. This drives
// the FailFast-fault-session-vs-disconnect decision: FailFast only faults the session
// when the overflowing subscriber is the sole subscriber.
//
// This snapshot is safe in v1 because AllowMultipleEventSubscribers=false is enforced
// by the validator and the single-subscriber guard in AttachEventSubscriber — a
// concurrent second registration is impossible, so the false-FailFast race (two
// subscribers, one overflows, Count reads as 1 after the other concurrently unregisters,
// FailFast wrongly faults the session) cannot occur today.
//
// REVISIT (Task 7/8): when multi-subscriber is enabled the guard is removed and the
// race window opens — a concurrent second registration could cause Count to read as 1
// here even with two subscribers, producing a false FailFast that faults a shared
// session. Resolve before enabling multi-subscriber.
//
// Task 6: the gateway-owned internal dashboard subscriber is excluded from this
// accounting. (a) An internal subscriber that overflows is NEVER the "only subscriber"
// — a slow/broken dashboard must never fault the session, only disconnect its own
// mirror. (b) Internal subscribers are excluded from the count, so a lone external
// gRPC subscriber still reports isOnlySubscriber==true and preserves the legacy
// FailFast session-fault behavior even while the dashboard mirror is attached.
bool isOnlySubscriber = !subscriber.IsInternal && CountExternalSubscribers() == 1;
_logger.LogDebug(
"Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).",
subscriber.Id,
_sessionId,
workerSequence);
// Observability + session-fault decision. Errors here must not stall the pump or
// leave the subscriber attached, so the disconnect below runs regardless.
// Pass subscriber.IsInternal so the handler can choose the correct metric label.
try
{
_overflowHandler?.Invoke(isOnlySubscriber, subscriber.IsInternal);
}
catch (Exception exception)
{
_logger.LogError(
exception,
"Event distributor overflow handler threw for session {SessionId}; disconnecting subscriber {SubscriberId} anyway.",
_sessionId,
subscriber.Id);
}
// Disconnect ONLY this subscriber: complete its channel with the overflow fault and
// remove it from the fan-out set. Its gRPC reader's MoveNextAsync then throws the
// SessionManagerException, which EventStreamService surfaces to the client exactly as
// the pre-epic per-RPC overflow did. The pump and every other subscriber are untouched.
if (_subscribers.TryRemove(subscriber.Id, out _))
{
subscriber.Channel.Writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
$"Session {_sessionId} event stream queue overflowed."));
}
}
// Counts external (non-internal) subscribers. Drives the isOnlySubscriber FailFast
// decision so the gateway-owned internal dashboard subscriber never inflates the count.
private int CountExternalSubscribers()
{
int count = 0;
foreach (Subscriber subscriber in _subscribers.Values)
{
if (!subscriber.IsInternal)
{
count++;
}
}
return count;
}
private void CompleteAllSubscribers(Exception? error)
{
foreach (Subscriber subscriber in _subscribers.Values)
{
subscriber.Channel.Writer.TryComplete(error);
}
}
private void Unregister(Subscriber subscriber)
{
if (_subscribers.TryRemove(subscriber.Id, out _))
{
subscriber.Channel.Writer.TryComplete();
}
}
/// <summary>
/// Returns the retained events with <see cref="MxEvent.WorkerSequence"/> strictly
/// greater than <paramref name="afterSequence"/>, in ascending sequence order, so a
/// reconnecting or reattaching subscriber can replay what it missed.
/// </summary>
/// <param name="afterSequence">
/// The last worker sequence the caller already observed. Only events newer than this
/// are returned.
/// </param>
/// <param name="events">
/// The retained events newer than <paramref name="afterSequence"/>, in order. Never
/// null; empty when nothing newer is retained.
/// </param>
/// <param name="gap">
/// <see langword="true"/> when events between <paramref name="afterSequence"/> and the
/// oldest retained event were already evicted (by capacity or age), meaning the caller
/// missed events that can no longer be replayed and must re-snapshot. When
/// <see langword="true"/>, whatever IS still retained is still returned via
/// <paramref name="events"/>.
/// </param>
/// <returns>
/// Always <see langword="true"/> — the out parameters fully describe the result. The
/// return value exists for a fluent call shape and future extension.
/// </returns>
/// <remarks>
/// <para>Gap semantics, by buffer state:</para>
/// <list type="bullet">
/// <item>
/// Buffer non-empty: <paramref name="gap"/> is <see langword="true"/> iff
/// <paramref name="afterSequence"/> is below the oldest retained sequence minus
/// one (i.e. at least one event newer than <paramref name="afterSequence"/> but
/// older than the oldest retained was evicted). When
/// <paramref name="afterSequence"/> equals or exceeds the newest retained
/// sequence the caller is fully caught up: empty list, no gap.
/// </item>
/// <item>
/// Buffer empty (retention disabled, nothing seen yet, or everything evicted):
/// empty list, and <paramref name="gap"/> is <see langword="true"/> iff
/// <paramref name="afterSequence"/> is below the highest sequence ever seen —
/// i.e. the caller is behind but nothing is retained to replay. If no event has
/// ever been seen, or the caller is already at/ahead of the highest seen, there
/// is nothing to miss: no gap.
/// </item>
/// </list>
/// </remarks>
public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList<MxEvent> events, out bool gap)
{
lock (_replayLock)
{
EvictAged();
if (_replayBuffer.Count == 0)
{
events = [];
// Nothing retained. The caller missed events only if it is behind the
// highest sequence ever seen (and we have seen at least one event).
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
return true;
}
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
// A gap exists when at least one event newer than afterSequence was evicted,
// i.e. afterSequence sits below the oldest-retained-minus-one boundary.
// Written as (oldestRetained > 0 && afterSequence < oldestRetained - 1) to
// avoid wrapping when afterSequence == ulong.MaxValue (afterSequence + 1
// would overflow to 0, falsely reporting a gap).
gap = oldestRetained > 0 && afterSequence < oldestRetained - 1;
// O(n) scan over the retained buffer — acceptable because TryGetReplayFrom
// is only called on subscriber reconnect, never on the hot fan-out path.
List<MxEvent> newer = [];
foreach (ReplayEntry entry in _replayBuffer)
{
if (entry.Event.WorkerSequence > afterSequence)
{
newer.Add(entry.Event);
}
}
events = newer;
return true;
}
}
private void AppendToReplayBuffer(MxEvent mxEvent)
{
lock (_replayLock)
{
_anyEventSeen = true;
if (mxEvent.WorkerSequence > _highestSequenceSeen)
{
_highestSequenceSeen = mxEvent.WorkerSequence;
}
// Capacity 0 disables retention: track the highest-seen sequence (so replay
// can still report a gap) but keep no events.
if (_replayBufferCapacity == 0)
{
return;
}
_replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow()));
// Capacity eviction: drop oldest until within bound.
while (_replayBuffer.Count > _replayBufferCapacity)
{
_replayBuffer.RemoveFirst();
}
EvictAged();
}
}
// Must be called under _replayLock. Drops entries older than the retention window.
private void EvictAged()
{
if (!_ageEvictionEnabled || _replayBuffer.Count == 0)
{
return;
}
DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention;
while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff)
{
_replayBuffer.RemoveFirst();
}
}
private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt);
private sealed class Subscriber(long id, Channel<MxEvent> channel, bool isInternal)
{
public long Id { get; } = id;
public Channel<MxEvent> Channel { get; } = channel;
// True for the gateway-owned internal dashboard subscriber. Excluded from the
// single-subscriber overflow accounting so it cannot fault the session.
public bool IsInternal { get; } = isInternal;
}
private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber)
: IEventSubscriberLease
{
private int _leaseDisposed;
public ChannelReader<MxEvent> Reader => subscriber.Channel.Reader;
public void Dispose()
{
// Atomic check-and-set so concurrent Dispose calls unregister at most once.
if (Interlocked.Exchange(ref _leaseDisposed, 1) == 0)
{
distributor.Unregister(subscriber);
}
}
}
}
@@ -0,0 +1,60 @@
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
namespace ZB.MOM.WW.MxGateway.Server.Sessions;
/// <summary>
/// Dependencies a <see cref="GatewaySession"/> needs to construct and own its
/// <see cref="SessionEventDistributor"/>. Bundled so the session constructor stays a
/// single optional parameter rather than four, and so unit tests that build a session
/// directly get a working distributor from <see cref="Default"/> without wiring DI.
/// </summary>
/// <param name="Mapper">
/// Maps worker IPC <c>WorkerEvent</c> frames to public <c>MxEvent</c>s. The distributor
/// pump applies this once per event in worker order, mirroring the mapping
/// <c>EventStreamService.ProduceEventsAsync</c> used before Task 4.
/// </param>
/// <param name="EventOptions">
/// Supplies the distributor's per-subscriber queue capacity and replay ring-buffer
/// bounds (<see cref="EventOptions.QueueCapacity"/>,
/// <see cref="EventOptions.ReplayBufferCapacity"/>,
/// <see cref="EventOptions.ReplayRetentionSeconds"/>).
/// </param>
/// <param name="DistributorLogger">Logger for the distributor pump lifecycle.</param>
/// <param name="TimeProvider">Clock used to timestamp and age-evict replay entries.</param>
/// <param name="Metrics">
/// Gateway metrics sink used by the session's per-subscriber overflow handler to record
/// the queue-overflow counter and, for legacy single-subscriber FailFast, the session
/// fault. Carrying it here keeps the distributor decoupled from the metrics type while
/// preserving the observability the pre-epic per-RPC overflow path emitted.
/// </param>
/// <param name="DashboardBroadcaster">
/// Sink the session's internal dashboard mirror loop (Task 6) publishes raw session
/// <c>MxEvent</c>s to. When non-null the session registers an internal distributor
/// subscriber on becoming Ready and mirrors every fanned event to the dashboard
/// EventsHub group regardless of whether a gRPC client is streaming. When null
/// (unit tests that don't exercise the dashboard mirror) no mirror is started.
/// </param>
public sealed record SessionEventStreaming(
MxAccessGrpcMapper Mapper,
EventOptions EventOptions,
ILogger<SessionEventDistributor> DistributorLogger,
TimeProvider TimeProvider,
GatewayMetrics Metrics,
IDashboardEventBroadcaster? DashboardBroadcaster = null)
{
/// <summary>
/// Defaults used when a session is constructed without explicit streaming
/// dependencies (unit tests). Uses a fresh mapper, default event options, a no-op
/// logger, the system clock, a fresh metrics sink, and no dashboard mirror.
/// </summary>
public static SessionEventStreaming Default { get; } = new(
new MxAccessGrpcMapper(),
new EventOptions(),
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
new GatewayMetrics());
}
@@ -25,6 +25,9 @@ public sealed class SessionManager : ISessionManager
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
private readonly SemaphoreSlim _sessionSlots;
private readonly Grpc.MxAccessGrpcMapper _eventMapper;
private readonly ILogger<SessionEventDistributor> _distributorLogger;
private readonly Dashboard.Hubs.IDashboardEventBroadcaster? _dashboardEventBroadcaster;
/// <summary>
/// Initializes a new instance of <see cref="SessionManager"/>.
@@ -35,13 +38,24 @@ public sealed class SessionManager : ISessionManager
/// <param name="metrics">Gateway metrics.</param>
/// <param name="timeProvider">Time provider for timestamps.</param>
/// <param name="logger">Logger.</param>
/// <param name="eventMapper">Mapper used by each session's event distributor to map worker events to public events.</param>
/// <param name="distributorLogger">Logger passed to each session's event distributor pump.</param>
/// <param name="dashboardEventBroadcaster">
/// Dashboard SignalR fan-out sink. Each session registers an internal distributor
/// subscriber (Task 6) that mirrors raw session events to this broadcaster, so the
/// dashboard receives events regardless of whether a gRPC client is streaming. Null in
/// unit tests that do not exercise the dashboard mirror.
/// </param>
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
ILogger<SessionManager>? logger = null,
Grpc.MxAccessGrpcMapper? eventMapper = null,
ILogger<SessionEventDistributor>? distributorLogger = null,
Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
@@ -49,6 +63,9 @@ public sealed class SessionManager : ISessionManager
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_eventMapper = eventMapper ?? new Grpc.MxAccessGrpcMapper();
_distributorLogger = distributorLogger ?? NullLogger<SessionEventDistributor>.Instance;
_dashboardEventBroadcaster = dashboardEventBroadcaster;
_options = options.Value;
_sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions);
}
@@ -58,11 +75,13 @@ public sealed class SessionManager : ISessionManager
/// </summary>
/// <param name="request">Session open request.</param>
/// <param name="clientIdentity">Client authentication identity.</param>
/// <param name="ownerKeyId">API key identifier of the caller creating the session.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Opened gateway session.</returns>
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
@@ -72,7 +91,7 @@ public sealed class SessionManager : ISessionManager
bool sessionOpenedRecorded = false;
try
{
session = CreateSession(request, clientIdentity);
session = CreateSession(request, clientIdentity, ownerKeyId);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
@@ -420,7 +439,8 @@ public sealed class SessionManager : ISessionManager
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
string? clientIdentity,
string? ownerKeyId)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
@@ -435,19 +455,29 @@ public sealed class SessionManager : ISessionManager
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
string clientCorrelationId = CreateClientCorrelationId(request.ClientSessionName, sessionId);
SessionEventStreaming eventStreaming = new(
_eventMapper,
_options.Events,
_distributorLogger,
_timeProvider,
_metrics,
_dashboardEventBroadcaster);
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
ownerKeyId,
request.ClientSessionName,
clientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
leaseDuration,
openedAt);
openedAt,
eventStreaming);
}
private static string CreateClientCorrelationId(
@@ -46,11 +46,14 @@
"MaxPendingCommandsPerSession": 128,
"DefaultLeaseSeconds": 1800,
"LeaseSweepIntervalSeconds": 30,
"AllowMultipleEventSubscribers": false
"AllowMultipleEventSubscribers": false,
"MaxEventSubscribersPerSession": 8
},
"Events": {
"QueueCapacity": 10000,
"BackpressurePolicy": "FailFast"
"BackpressurePolicy": "FailFast",
"ReplayBufferCapacity": 1024,
"ReplayRetentionSeconds": 300
},
"Dashboard": {
"Enabled": true,
@@ -410,6 +410,7 @@ public sealed class AlarmFailoverEndToEndTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
GatewaySession session = new(
@@ -711,6 +711,7 @@ public sealed class GatewayAlarmMonitorProviderModeTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
GatewaySession session = new(
@@ -31,9 +31,11 @@ public sealed class GatewayOptionsTests
Assert.Equal(30, options.Sessions.DefaultCommandTimeoutSeconds);
Assert.Equal(64, options.Sessions.MaxSessions);
Assert.Equal(128, options.Sessions.MaxPendingCommandsPerSession);
Assert.Equal(1800, options.Sessions.DefaultLeaseSeconds);
Assert.Equal(30, options.Sessions.LeaseSweepIntervalSeconds);
Assert.False(options.Sessions.AllowMultipleEventSubscribers);
Assert.Equal(8, options.Sessions.MaxEventSubscribersPerSession);
Assert.Equal(10_000, options.Events.QueueCapacity);
Assert.Equal(EventBackpressurePolicy.FailFast, options.Events.BackpressurePolicy);
@@ -289,4 +289,71 @@ public sealed class GatewayOptionsValidatorTests
Assert.True(result.Failed);
Assert.Contains(result.Failures!, f => f.Contains(keyPart));
}
// -------------------------------------------------------------------------
// AllowMultipleEventSubscribers / MaxEventSubscribersPerSession validation
// -------------------------------------------------------------------------
private static GatewayOptions CloneWithSessions(GatewayOptions source, SessionOptions sessions)
=> new()
{
Authentication = source.Authentication,
Ldap = source.Ldap,
Worker = source.Worker,
Sessions = sessions,
Events = source.Events,
Dashboard = source.Dashboard,
Protocol = source.Protocol,
Alarms = source.Alarms,
Tls = source.Tls,
};
[Fact]
public void Validate_Succeeds_WhenAllowMultipleEventSubscribersIsTrue()
{
// AllowMultipleEventSubscribers=true must now validate cleanly (no longer rejected).
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { AllowMultipleEventSubscribers = true });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
[Theory]
[InlineData(0)]
[InlineData(-1)]
public void Validate_Fails_WhenMaxEventSubscribersPerSessionBelowOne(int value)
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { MaxEventSubscribersPerSession = value });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Failed);
Assert.Contains(
result.Failures!,
f => f.Contains("MxGateway:Sessions:MaxEventSubscribersPerSession"));
}
[Theory]
[InlineData(1)]
[InlineData(8)]
[InlineData(32)]
public void Validate_Succeeds_WhenMaxEventSubscribersPerSessionIsPositive(int value)
{
GatewayOptions options = CloneWithSessions(
ValidOptions(),
new SessionOptions { MaxEventSubscribersPerSession = value });
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
[Fact]
public void Validate_Succeeds_WithDefaultSessionOptions()
{
// Default SessionOptions (AllowMultipleEventSubscribers=false, MaxEventSubscribersPerSession=8)
// must validate cleanly.
GatewayOptions options = CloneWithSessions(ValidOptions(), new SessionOptions());
ValidateOptionsResult result = new GatewayOptionsValidator().Validate(null, options);
Assert.True(result.Succeeded);
}
}
@@ -244,6 +244,7 @@ public sealed class DashboardSessionAdminServiceTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
throw new NotSupportedException();
@@ -268,15 +268,13 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
workerClientFactory,
options,
_metrics,
logger: NullLogger<SessionManager>.Instance);
logger: NullLogger<SessionManager>.Instance,
dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance);
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
mapper,
_metrics,
NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
_metrics);
Service = new MxAccessGatewayService(
sessionManager,
@@ -9,7 +9,6 @@ using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc;
@@ -157,59 +156,99 @@ public sealed class EventStreamServiceTests
await WaitUntilAsync(() => metrics.GetSnapshot().GrpcEventStreamQueueDepth == 0);
}
/// <summary>Verifies that event queue overflow faults the session and reports the overflow metric.</summary>
/// <summary>
/// Re-targeted in Task 5: a per-subscriber channel overflow in the session's
/// <see cref="SessionEventDistributor"/> faults the whole session under the legacy
/// single-subscriber FailFast policy (the default, single-subscriber mode) and records
/// the overflow + fault metrics. The distributor completes this subscriber's channel
/// with the overflow fault, which surfaces here as the same
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/> the pre-epic per-RPC
/// overflow produced.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
GatewaySession session = CreateReadySession(
workerClient,
queueCapacity: 1,
metrics: metrics,
backpressurePolicy: EventBackpressurePolicy.FailFast);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
for (ulong sequence = 1; sequence <= 50; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
await WaitUntilAsync(() => session.State == SessionState.Faulted);
// The pump fans 50 events into a capacity-1 subscriber channel faster than this
// single reader drains, so one of the reads observes the terminal overflow fault.
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
async () =>
{
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
{
}
});
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
await WaitUntilAsync(() => session.State == SessionState.Faulted);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
Assert.Equal(1, metrics.GetSnapshot().Faults);
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(1, snapshot.QueueOverflows);
Assert.Equal(1, snapshot.Faults);
// The finally block in StreamEventsAsync calls StreamDisconnected("Detached") on the
// overflow+fault path too; pin it here so a regression removing that call is caught.
Assert.Equal(1, snapshot.StreamDisconnects);
}
/// <summary>Verifies that the disconnect backpressure policy disconnects the subscriber without faulting the session.</summary>
/// <summary>
/// Re-targeted in Task 5: under the DisconnectSubscriber policy a per-subscriber
/// channel overflow disconnects only that subscriber's stream (terminal
/// <see cref="SessionManagerErrorCode.EventQueueOverflow"/>) and records the overflow
/// metric, but leaves the session <see cref="SessionState.Ready"/> and records no
/// fault. The session, pump, and any other subscribers are unaffected.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflowsWithDisconnectPolicy_LeavesSessionReady()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
GatewaySession session = CreateReadySession(
workerClient,
queueCapacity: 1,
metrics: metrics,
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1,
backpressurePolicy: EventBackpressurePolicy.DisconnectSubscriber);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
for (ulong sequence = 1; sequence <= 50; sequence++)
{
workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange));
}
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
async () =>
{
while (await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout))
{
}
});
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
Assert.Equal(SessionState.Ready, session.State);
@@ -261,81 +300,11 @@ public sealed class EventStreamServiceTests
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
/// <summary>
/// Tests-026 regression: <see cref="EventStreamService.StreamEventsAsync"/>
/// must mirror every yielded event to the
/// <see cref="ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster"/>
/// seam (the only path that fans events out to dashboard SignalR clients).
/// A regression that silently dropped the <c>Publish</c> call — e.g. an
/// <c>if</c> accidentally added around it, or the broadcaster ctor
/// parameter being removed — would have produced no failing test before
/// this fixture existed. The recording fake captures every call and we
/// assert one publish per yielded event, with the correct session id and
/// preserved <c>WorkerSequence</c>.
/// </summary>
[Fact]
public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
RecordingDashboardEventBroadcaster recordingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
dashboardEventBroadcaster: recordingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
IReadOnlyList<DashboardEventCapture> captures = recordingBroadcaster.Captures;
Assert.Equal(2, captures.Count);
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family);
Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family);
}
/// <summary>
/// Server-041 regression: <see cref="EventStreamService"/> must not
/// abort the gRPC stream when the dashboard broadcaster throws.
/// <c>IDashboardEventBroadcaster.Publish</c> is documented as
/// best-effort and never-throw, but the gRPC consumer cannot rely on
/// implementation discipline alone — the seam itself swallows the
/// fault and logs at debug, mirroring the broadcaster's own
/// continuation handler. Without the wrap, the producer loop would
/// surface the exception and the client would see a faulted stream
/// for a dashboard-mirror failure.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
ThrowingDashboardEventBroadcaster throwingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
dashboardEventBroadcaster: throwingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal(2, throwingBroadcaster.PublishAttempts);
Assert.NotEqual(SessionState.Faulted, session.State);
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
int queueCapacity = 8,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast,
ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
{
return new EventStreamService(
sessionManager,
@@ -347,25 +316,7 @@ public sealed class EventStreamServiceTests
BackpressurePolicy = backpressurePolicy,
},
}),
new MxAccessGrpcMapper(),
metrics ?? new GatewayMetrics(),
dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
}
private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
/// <summary>Gets the count of publish attempts.</summary>
public int PublishAttempts { get; private set; }
/// <summary>Increments the attempt count and throws a simulated failure.</summary>
/// <param name="sessionId">The session identifier.</param>
/// <param name="mxEvent">The event to publish.</param>
public void Publish(string sessionId, MxEvent mxEvent)
{
PublishAttempts++;
throw new InvalidOperationException("simulated dashboard broadcaster failure");
}
metrics ?? new GatewayMetrics());
}
private static async Task<List<MxEvent>> CollectEventsAsync(
@@ -393,20 +344,39 @@ public sealed class EventStreamServiceTests
private static GatewaySession CreateReadySession(
FakeWorkerClient workerClient,
string sessionId = "session-events")
string sessionId = "session-events",
int queueCapacity = 8,
GatewayMetrics? metrics = null,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
{
// The per-subscriber overflow policy now lives in the session's
// SessionEventDistributor, so the session must share the same metrics sink and
// backpressure policy the overflow assertions observe. queueCapacity flows into the
// distributor's per-subscriber channel bound, which is what overflows.
GatewaySession session = new(
sessionId,
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
"client",
ownerKeyId: null,
"client-session",
"client-correlation",
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(10),
DateTimeOffset.UtcNow);
TimeSpan.FromMinutes(30),
DateTimeOffset.UtcNow,
new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions
{
QueueCapacity = queueCapacity,
BackpressurePolicy = backpressurePolicy,
},
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
metrics ?? new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
@@ -471,6 +441,7 @@ public sealed class EventStreamServiceTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
return Task.FromResult(_sessions.Values.First());
@@ -884,10 +884,12 @@ public sealed class MxAccessGatewayServiceConstraintTests
/// <summary>Opens a test session asynchronously.</summary>
/// <param name="request">The session open request.</param>
/// <param name="clientIdentity">The client identity, if any.</param>
/// <param name="ownerKeyId">The API key identifier of the caller, if any.</param>
/// <param name="cancellationToken">Token to observe for cancellation.</param>
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken) =>
Task.FromResult(seededSessions.Values.First());
@@ -45,6 +45,7 @@ public sealed class MxAccessGatewayServiceTests
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Contains("unary-invoke", reply.Capabilities);
Assert.Equal("Operator Key", sessionManager.LastClientIdentity);
Assert.Equal("operator01", sessionManager.LastOwnerKeyId);
Assert.Equal("operator-session", sessionManager.LastOpenRequest?.ClientSessionName);
}
@@ -508,6 +509,9 @@ public sealed class MxAccessGatewayServiceTests
/// <summary>The last client identity passed to OpenSessionAsync.</summary>
public string? LastClientIdentity { get; private set; }
/// <summary>The last owner key id passed to OpenSessionAsync.</summary>
public string? LastOwnerKeyId { get; private set; }
/// <summary>The last session ID passed to ReadEventsAsync.</summary>
public string? LastReadEventsSessionId { get; private set; }
@@ -545,10 +549,12 @@ public sealed class MxAccessGatewayServiceTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
LastOpenRequest = request;
LastClientIdentity = clientIdentity;
LastOwnerKeyId = ownerKeyId;
return Task.FromResult(OpenSessionResult ?? CreateSession("session-1", processId: 1234));
}
@@ -0,0 +1,323 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Task 6 regression tests for the internal dashboard mirror. The dashboard is a
/// first-class subscriber on the session's <see cref="SessionEventDistributor"/>, so it
/// receives session events whether or not a gRPC client is streaming — fixing the
/// "dark feed" where the dashboard only saw events while a gRPC client was actively
/// streaming (the inline per-RPC tap removed by this task).
/// </summary>
public sealed class GatewaySessionDashboardMirrorTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
/// <summary>
/// The KEY bug-fix test: the dashboard broadcaster receives session events even when
/// NO gRPC <c>StreamEvents</c> subscriber is attached. The session is driven to Ready
/// with a fake worker emitting events; only the internal dashboard subscriber exists.
/// Before Task 6 the mirror lived inside the per-RPC gRPC loop, so with no gRPC
/// subscriber the dashboard saw nothing.
/// </summary>
[Fact]
public async Task DashboardMirror_ReceivesEvents_WithNoGrpcSubscriber()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(10, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(11, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
// MarkReady starts the internal dashboard mirror; no gRPC subscriber is ever attached.
session.MarkReady();
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
IReadOnlyList<DashboardEventCapture> captures = broadcaster.Captures;
Assert.Equal(0, session.ActiveEventSubscriberCount);
Assert.Equal([10UL, 11UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
}
/// <summary>
/// A gRPC subscriber and the dashboard both receive every event concurrently. The
/// gRPC path is no longer the dashboard's source — both read independent leases fed by
/// the single distributor pump.
/// </summary>
[Fact]
public async Task DashboardMirror_AndGrpcSubscriber_BothReceiveEvents()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(3, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
EventStreamService service = new(
new SingleSessionManager(session),
Options.Create(new GatewayOptions { Events = new EventOptions { QueueCapacity = 8 } }),
new GatewayMetrics());
List<MxEvent> grpcEvents = [];
await foreach (MxEvent mxEvent in service
.StreamEventsAsync(new StreamEventsRequest { SessionId = session.SessionId }, CancellationToken.None)
.WithCancellation(CancellationToken.None))
{
grpcEvents.Add(mxEvent);
}
await WaitUntilAsync(() => broadcaster.Captures.Count == 3);
Assert.Equal([1UL, 2UL, 3UL], grpcEvents.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal([1UL, 2UL, 3UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
}
/// <summary>
/// Task 4 hazard guard: starting the pump at Ready with a fast-completing worker stream
/// and zero subscribers used to drain into nothing and leave a later subscriber hanging.
/// Now the dashboard subscriber is registered BEFORE the pump starts, so even a worker
/// stream that completes immediately delivers every event to the dashboard with no hang.
/// </summary>
[Fact]
public async Task DashboardMirror_FastCompletingWorkerStream_DeliversAllEventsWithoutHang()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
Assert.Equal([1UL, 2UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
}
/// <summary>
/// The dashboard Publish must be never-throw at the seam too: a throwing broadcaster
/// must not fault the session or stop the mirror from continuing past the failure.
/// </summary>
[Fact]
public async Task DashboardMirror_WhenBroadcasterThrows_DoesNotFaultSessionAndKeepsMirroring()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
ThrowingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
await WaitUntilAsync(() => broadcaster.PublishAttempts == 2);
Assert.NotEqual(SessionState.Faulted, session.State);
}
/// <summary>
/// The internal dashboard subscriber must NOT count against the single-subscriber
/// guard: a gRPC subscriber can still attach while the dashboard mirror is running.
/// </summary>
[Fact]
public async Task DashboardMirror_DoesNotCountAgainstSingleSubscriberGuard()
{
FakeWorkerClient workerClient = new();
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
Assert.Equal(0, session.ActiveEventSubscriberCount);
using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false);
Assert.Equal(1, session.ActiveEventSubscriberCount);
}
private static GatewaySession CreateSession(
IWorkerClient workerClient,
IDashboardEventBroadcaster broadcaster)
{
return new GatewaySession(
sessionId: "session-dashboard-mirror",
backendName: GatewayContractInfo.DefaultBackendName,
pipeName: "mxaccess-gateway-1-session-dashboard-mirror",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: DateTimeOffset.UtcNow,
eventStreaming: new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
new GatewayMetrics(),
broadcaster));
}
private static WorkerEvent CreateWorkerEvent(ulong sequence, MxEventFamily family)
{
MxEvent mxEvent = new()
{
SessionId = "session-dashboard-mirror",
Family = family,
WorkerSequence = sequence,
};
switch (family)
{
case MxEventFamily.OnDataChange:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
}
return new WorkerEvent { Event = mxEvent };
}
private static async Task WaitUntilAsync(Func<bool> predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
try
{
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}");
}
}
private sealed class ThrowingDashboardEventBroadcaster : IDashboardEventBroadcaster
{
private int _publishAttempts;
public int PublishAttempts => Volatile.Read(ref _publishAttempts);
public void Publish(string sessionId, MxEvent mxEvent)
{
Interlocked.Increment(ref _publishAttempts);
throw new InvalidOperationException("simulated dashboard broadcaster failure");
}
}
private sealed class SingleSessionManager(GatewaySession session) : ISessionManager
{
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken) => Task.FromResult(session);
public bool TryGetSession(string sessionId, out GatewaySession gatewaySession)
{
gatewaySession = session;
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
}
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken) => session.ReadEventsAsync(cancellationToken);
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken) =>
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
public Task<SessionCloseResult> KillWorkerAsync(
string sessionId,
string reason,
CancellationToken cancellationToken) =>
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken) => Task.FromResult(0);
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class FakeWorkerClient : IWorkerClient
{
public List<WorkerEvent> Events { get; } = [];
public bool CompleteAfterConfiguredEvents { get; set; }
public string SessionId { get; } = "session-dashboard-mirror";
public int? ProcessId { get; } = 1234;
public WorkerClientState State { get; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (WorkerEvent workerEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
yield return workerEvent;
}
if (CompleteAfterConfiguredEvents)
{
yield break;
}
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
public void Kill(string reason)
{
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}
@@ -1,5 +1,9 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
@@ -156,6 +160,66 @@ public sealed class GatewaySessionTests
await session.DisposeAsync();
}
/// <summary>
/// Issue-1 regression. Concurrent <c>Dispose()</c> calls on the same
/// <see cref="IEventSubscriberLease"/> — as can happen when a gRPC stream
/// completion and a client cancellation both fire at the same time — must
/// decrement <c>_activeEventSubscriberCount</c> exactly once, never to 1.
/// A negative count permanently blocks future subscribers because
/// <c>AttachEventSubscriber(allowMultipleSubscribers:false)</c> gates on
/// <c>_activeEventSubscriberCount > 0</c>. After both racing disposes finish,
/// the count must be exactly 0 and a subsequent single-subscriber attach must
/// succeed.
/// </summary>
[Fact]
public async Task EventSubscriberLease_ConcurrentDispose_DecrementsCountExactlyOnce()
{
const int Concurrency = 16;
const int Iterations = 200;
TimeSpan testTimeout = TimeSpan.FromSeconds(10);
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySessionWithEventStreaming(workerClient);
for (int i = 0; i < Iterations; i++)
{
// Attach one subscriber; this increments _activeEventSubscriberCount to 1.
IEventSubscriberLease lease = session.AttachEventSubscriber(
allowMultipleSubscribers: false);
// Race Concurrency threads all calling Dispose() on the same lease.
// Only one must actually run DetachEventSubscriber.
using SemaphoreSlim gate = new(0);
Task[] tasks = new Task[Concurrency];
for (int t = 0; t < Concurrency; t++)
{
tasks[t] = Task.Run(async () =>
{
// All threads wait at the gate so they start as simultaneously
// as the scheduler allows, maximising the race window.
await gate.WaitAsync(testTimeout);
lease.Dispose();
});
}
gate.Release(Concurrency);
await Task.WhenAll(tasks).WaitAsync(testTimeout);
// Count must be exactly 0 — not negative — after all disposes.
Assert.Equal(0, session.ActiveEventSubscriberCount);
// Observable contract: a fresh single subscriber must now be attachable
// (i.e., the guard _activeEventSubscriberCount > 0 is false).
IEventSubscriberLease next = session.AttachEventSubscriber(
allowMultipleSubscribers: false);
next.Dispose();
Assert.Equal(0, session.ActiveEventSubscriberCount);
}
await session.CloseAsync("test-done", CancellationToken.None);
await session.DisposeAsync();
}
private static GatewaySession CreateReadySession(IWorkerClient workerClient)
{
GatewaySession session = new(
@@ -164,6 +228,7 @@ public sealed class GatewaySessionTests
pipeName: "mxaccess-gateway-1-session-test",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
@@ -176,6 +241,33 @@ public sealed class GatewaySessionTests
return session;
}
private static GatewaySession CreateReadySessionWithEventStreaming(IWorkerClient workerClient)
{
GatewaySession session = new(
sessionId: "session-test-concurrent",
backendName: "mxaccess",
pipeName: "mxaccess-gateway-1-session-test-concurrent",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: DateTimeOffset.UtcNow,
eventStreaming: new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
new GatewayMetrics()));
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
}
/// <summary>
/// Minimal worker client that parks <see cref="ShutdownAsync"/> until the test
/// explicitly releases it. Used to keep <see cref="GatewaySession.CloseAsync"/>
@@ -0,0 +1,569 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Sessions;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Concurrency and fan-out tests for <see cref="SessionEventDistributor"/>, the
/// Session Resilience epic's per-session event pump. One pump drains the source
/// exactly once and fans every event to N independent per-subscriber channels.
/// Every async wait is bounded so a fan-out or shutdown deadlock fails fast.
/// </summary>
public sealed class SessionEventDistributorTests
{
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task TwoSubscribers_BothReceiveFannedEventsInOrder()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(1));
source.Writer.TryWrite(Event(2));
MxEvent a1 = await ReadOneAsync(leaseA.Reader);
MxEvent a2 = await ReadOneAsync(leaseA.Reader);
MxEvent b1 = await ReadOneAsync(leaseB.Reader);
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
Assert.Equal(1ul, a1.WorkerSequence);
Assert.Equal(2ul, a2.WorkerSequence);
Assert.Equal(1ul, b1.WorkerSequence);
Assert.Equal(2ul, b2.WorkerSequence);
}
[Fact]
public async Task DisposingOneLease_StopsItsDelivery_OtherKeepsReceiving()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(1));
_ = await ReadOneAsync(leaseA.Reader);
_ = await ReadOneAsync(leaseB.Reader);
leaseA.Dispose();
// A's reader must complete (no more delivery) after dispose.
await AssertCompletedAsync(leaseA.Reader);
// B still receives subsequent events.
source.Writer.TryWrite(Event(2));
MxEvent b2 = await ReadOneAsync(leaseB.Reader);
Assert.Equal(2ul, b2.WorkerSequence);
}
[Fact]
public async Task SubscriberRegisteredAfterStart_ReceivesEventsEmittedAfterRegistration()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
source.Writer.TryWrite(Event(1));
_ = await ReadOneAsync(leaseA.Reader);
// Late subscriber: only sees events emitted after it registered.
using IEventSubscriberLease leaseB = distributor.Register();
source.Writer.TryWrite(Event(2));
MxEvent b = await ReadOneAsync(leaseB.Reader);
Assert.Equal(2ul, b.WorkerSequence);
}
[Fact]
public async Task DisposingDistributor_CompletesAllSubscriberChannels_AndStopsPump()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease leaseA = distributor.Register();
using IEventSubscriberLease leaseB = distributor.Register();
// Bounded so a shutdown hang fails fast.
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
await AssertCompletedAsync(leaseA.Reader);
await AssertCompletedAsync(leaseB.Reader);
}
[Fact]
public async Task Register_AfterDispose_ThrowsObjectDisposedException()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
SessionEventDistributor distributor = CreateDistributor(source.Reader);
await distributor.StartAsync(CancellationToken.None);
await distributor.DisposeAsync().AsTask().WaitAsync(ReadTimeout);
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
}
[Fact]
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 3,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
// A live subscriber forces the pump to fan (and thereby retain) each event,
// and gives us a deterministic point to know the pump has processed event 5.
using IEventSubscriberLease lease = distributor.Register();
for (ulong sequence = 1; sequence <= 5; sequence++)
{
source.Writer.TryWrite(Event(sequence));
}
for (ulong sequence = 1; sequence <= 5; sequence++)
{
MxEvent e = await ReadOneAsync(lease.Reader);
Assert.Equal(sequence, e.WorkerSequence);
}
// Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2
// were evicted, so a caller asking from 0 missed events => gap=true, and it
// gets only the retained tail.
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.True(gap);
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
}
[Fact]
public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 10,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
for (ulong sequence = 1; sequence <= 5; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(lease.Reader);
}
// afterSequence 2 is still inside the retained window [1..5], so no gap and
// exactly the newer events 3, 4, 5 come back.
bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.False(gap);
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
}
[Fact]
public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses()
{
FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 100,
replayRetentionSeconds: 30,
timeProvider: time);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
// Two old events, then advance the clock well past the retention window.
source.Writer.TryWrite(Event(1));
source.Writer.TryWrite(Event(2));
_ = await ReadOneAsync(lease.Reader);
_ = await ReadOneAsync(lease.Reader);
time.Advance(TimeSpan.FromSeconds(60));
// A fresh event triggers age-eviction of the now-stale entries 1 and 2.
source.Writer.TryWrite(Event(3));
_ = await ReadOneAsync(lease.Reader);
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
// Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained.
Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence));
Assert.True(gap);
}
[Fact]
public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap()
{
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 10,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
for (ulong sequence = 1; sequence <= 3; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(lease.Reader);
}
// afterSequence 3 is at/after the newest retained; nothing newer, and the
// caller is fully caught up => empty list, gap=false.
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.False(gap);
Assert.Empty(replay);
}
[Fact]
public async Task ReplayBuffer_Capacity0_AfterSequenceBelowHighestSeen_ReportsGap_NoEvents()
{
// Disabled buffer: events are tracked for the highest-seen counter but not
// retained. A caller behind the highest-seen sequence must be told to re-snapshot.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 0,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
for (ulong sequence = 1; sequence <= 3; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(lease.Reader);
}
// afterSequence=1 is below highestSeen=3 — gap, nothing to replay.
bool found = distributor.TryGetReplayFrom(1, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.True(gap);
Assert.Empty(replay);
}
[Fact]
public async Task ReplayBuffer_Capacity0_AfterSequenceAtOrAboveHighestSeen_NoGap_NoEvents()
{
// Disabled buffer: caller is already caught up — no gap, nothing to replay.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 0,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
for (ulong sequence = 1; sequence <= 3; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(lease.Reader);
}
// afterSequence=3 equals highestSeen — caller is fully caught up.
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.False(gap);
Assert.Empty(replay);
}
[Fact]
public async Task ReplayBuffer_NoEventsSeen_AnyAfterSequence_NoGap_NoEvents()
{
// No events ever seen: nothing can have been missed, so gap must be false.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 0,
replayRetentionSeconds: 0);
// Pump not started — no events arrive.
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.False(gap);
Assert.Empty(replay);
}
[Fact]
public async Task ReplayBuffer_AfterSequenceMaxValue_WithRetainedEvents_NoGap_NoNewEvents()
{
// ulong.MaxValue as afterSequence: afterSequence + 1 would wrap to 0, which the
// old code used to compare against oldestRetained, falsely reporting gap=true.
// The corrected formula must yield gap=false and an empty replay list.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
await using SessionEventDistributor distributor = CreateDistributor(
source.Reader,
replayBufferCapacity: 10,
replayRetentionSeconds: 0);
await distributor.StartAsync(CancellationToken.None);
using IEventSubscriberLease lease = distributor.Register();
source.Writer.TryWrite(Event(1));
_ = await ReadOneAsync(lease.Reader);
bool found = distributor.TryGetReplayFrom(ulong.MaxValue, out IReadOnlyList<MxEvent> replay, out bool gap);
Assert.True(found);
Assert.False(gap);
Assert.Empty(replay);
}
[Fact]
public async Task SlowSubscriberOverflow_DisconnectsOnlyThatSubscriber_PumpAndOtherKeepRunning()
{
// Per-subscriber backpressure isolation (Task 5): one subscriber stops reading and
// overflows its own tiny channel; it is disconnected with an EventQueueOverflow fault
// while a second, healthy subscriber keeps receiving and the pump keeps pumping.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
int overflowCalls = 0;
// Separate fields for the bool value and the "set" flag so both can use
// Volatile.Read/Write; bool? is not valid for the volatile keyword on a local.
// Interlocked.Increment on the pump thread is the store for overflowCalls;
// Volatile.Read/Write provide ordering for observedIsOnlySubscriber.
int observedIsOnlySubscriberSet = 0;
bool observedIsOnlySubscriberValue = false;
await using SessionEventDistributor distributor = new(
"session-test",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 1024,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
(isOnlySubscriber, _) =>
{
Interlocked.Increment(ref overflowCalls);
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
});
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: registered but never read, so its capacity-2 channel fills.
using IEventSubscriberLease slow = distributor.Register();
// Healthy subscriber: drains promptly throughout.
using IEventSubscriberLease healthy = distributor.Register();
// Push more events than the slow subscriber's channel can hold while the healthy one
// keeps up. The slow channel overflows; the healthy channel does not.
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
MxEvent received = await ReadOneAsync(healthy.Reader);
Assert.Equal(sequence, received.WorkerSequence);
}
// The slow subscriber is disconnected with the overflow fault.
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(slow.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Two subscribers were registered at overflow time, so isOnlySubscriber is false.
// Use Interlocked.Read / Volatile.Read so the test-thread reads are ordered after the
// pump-thread writes, avoiding a data race by the C# memory model.
Assert.Equal(1, Volatile.Read(ref overflowCalls));
Assert.Equal(1, Volatile.Read(ref observedIsOnlySubscriberSet));
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue));
Assert.Equal(1, distributor.SubscriberCount);
// The pump is still running and the healthy subscriber still receives new events.
source.Writer.TryWrite(Event(11));
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
Assert.Equal(11ul, afterOverflow.WorkerSequence);
}
[Fact]
public async Task SlowSubscriberOverflow_WithMultipleSubscribers_HandlerSeesIsOnlySubscriberFalse_OtherKeepsReceiving()
{
// Distributor-level pin for "FailFast with multiple subscribers degrades to
// disconnect-only (no session fault)": when the overflowing subscriber is NOT the
// sole subscriber, isOnlySubscriber is false, so a FailFast-wired handler must NOT
// fault the session. This test drives the distributor directly (without GatewaySession)
// with two subscribers and a FailFast-style overflow handler seam, overflows the slow
// one, and asserts (a) isOnlySubscriber==false, (b) the other subscriber keeps
// receiving, and (c) the pump keeps running — all without a GatewaySession.
//
// TODO(Task 8): add a GatewaySession-level "session stays Ready" assertion once
// multi-subscriber config is enabled by the Tasks 7/8 validator/guard change.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
bool handlerFiredWithFalse = false;
bool sessionFaultWouldBeCalled = false; // tracks if a FailFast path would fault
await using SessionEventDistributor distributor = new(
"session-multi-sub",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
(isOnlySubscriber, _) =>
{
if (!isOnlySubscriber)
{
// Multi-subscriber: FailFast degrades to disconnect-only.
Volatile.Write(ref handlerFiredWithFalse, true);
}
else
{
// Single-subscriber: FailFast would fault the session — must not happen here.
Volatile.Write(ref sessionFaultWouldBeCalled, true);
}
});
await distributor.StartAsync(CancellationToken.None);
// Slow subscriber: never reads, so capacity-2 channel overflows quickly.
using IEventSubscriberLease slow = distributor.Register();
// Healthy subscriber: drains every event promptly.
using IEventSubscriberLease healthy = distributor.Register();
// Drive enough events to overflow the slow subscriber's channel.
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
_ = await ReadOneAsync(healthy.Reader);
}
// Slow subscriber is disconnected with the overflow fault.
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(slow.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// The handler saw isOnlySubscriber==false (multi-subscriber degradation path).
Assert.True(Volatile.Read(ref handlerFiredWithFalse));
// The FailFast session-fault branch was NOT taken (session stays Ready equivalent).
Assert.False(Volatile.Read(ref sessionFaultWouldBeCalled));
// The pump and healthy subscriber are unaffected.
source.Writer.TryWrite(Event(11));
MxEvent afterOverflow = await ReadOneAsync(healthy.Reader);
Assert.Equal(11ul, afterOverflow.WorkerSequence);
}
[Fact]
public async Task InternalSubscriberOverflow_HandlerSeesIsOnlySubscriberFalse_ProvingCountExcludesInternal()
{
// Issue 3: verifies that CountExternalSubscribers() excludes the internal dashboard
// subscriber, so a FailFast policy would NOT fault the session even when the internal
// subscriber is the ONLY registered subscriber. The overflow handler receives
// isOnlySubscriber==false (not true) because the overflowing subscriber is internal
// and is therefore excluded from the external-subscriber count.
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
int observedIsOnlySubscriberSet = 0;
bool observedIsOnlySubscriberValue = false;
bool observedIsInternalValue = false;
await using SessionEventDistributor distributor = new(
"session-internal-overflow",
ct => source.Reader.ReadAllAsync(ct),
subscriberQueueCapacity: 2,
replayBufferCapacity: 0,
replayRetentionSeconds: 0,
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
(isOnlySubscriber, isInternal) =>
{
Volatile.Write(ref observedIsOnlySubscriberValue, isOnlySubscriber);
Volatile.Write(ref observedIsInternalValue, isInternal);
Volatile.Write(ref observedIsOnlySubscriberSet, 1);
});
await distributor.StartAsync(CancellationToken.None);
// Register ONLY an internal subscriber — no external subscriber is attached.
using IEventSubscriberLease internalLease = distributor.Register(isInternal: true);
// Push enough events to overflow the capacity-2 internal subscriber channel.
for (ulong sequence = 1; sequence <= 10; sequence++)
{
source.Writer.TryWrite(Event(sequence));
}
// The internal subscriber is disconnected with the overflow fault.
SessionManagerException fault = await Assert.ThrowsAsync<SessionManagerException>(
async () => await DrainUntilFaultAsync(internalLease.Reader));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, fault.ErrorCode);
// Wait for the handler to fire (it runs on the pump thread).
await Task.Run(async () =>
{
using CancellationTokenSource cts = new(ReadTimeout);
while (Volatile.Read(ref observedIsOnlySubscriberSet) == 0)
{
await Task.Delay(10, cts.Token);
}
});
// isOnlySubscriber must be FALSE even though the internal subscriber was the ONLY
// subscriber — CountExternalSubscribers excludes it, so a FailFast policy on the
// external count would NOT fault the session.
Assert.True(Volatile.Read(ref observedIsOnlySubscriberSet) == 1, "Overflow handler should have fired.");
Assert.False(Volatile.Read(ref observedIsOnlySubscriberValue),
"isOnlySubscriber must be false for an internal subscriber (CountExternalSubscribers excludes it).");
Assert.True(Volatile.Read(ref observedIsInternalValue),
"isInternal must be true for a subscriber registered with isInternal: true.");
}
private static async Task DrainUntilFaultAsync(ChannelReader<MxEvent> reader)
{
// Drains any buffered events, then surfaces the channel's completion fault (if any)
// by awaiting the final read past the buffered tail.
while (true)
{
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
while (reader.TryRead(out _))
{
}
}
}
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);
private static SessionEventDistributor CreateDistributor(
ChannelReader<MxEvent> source,
int replayBufferCapacity,
double replayRetentionSeconds,
TimeProvider? timeProvider = null)
=> new(
"session-test",
ct => source.ReadAllAsync(ct),
subscriberQueueCapacity: 64,
replayBufferCapacity: replayBufferCapacity,
replayRetentionSeconds: replayRetentionSeconds,
NullLogger<SessionEventDistributor>.Instance,
timeProvider ?? TimeProvider.System);
private static MxEvent Event(ulong sequence)
=> new() { SessionId = "session-test", WorkerSequence = sequence };
private static async Task<MxEvent> ReadOneAsync(ChannelReader<MxEvent> reader)
{
await reader.WaitToReadAsync().AsTask().WaitAsync(ReadTimeout);
Assert.True(reader.TryRead(out MxEvent? value));
return value!;
}
private static async Task AssertCompletedAsync(ChannelReader<MxEvent> reader)
{
// Drain anything still buffered, then assert the channel is completed
// (no further events). Bounded so a never-completing channel fails fast.
await reader.Completion.WaitAsync(ReadTimeout);
}
}
@@ -663,7 +663,7 @@ public sealed class SessionManagerBulkTests
private static async Task<GatewaySession> OpenSessionAsync(IWorkerClient workerClient)
{
SessionManager manager = CreateManager(workerClient);
return await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
return await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
}
private static SessionManager CreateManager(IWorkerClient workerClient)
@@ -23,7 +23,7 @@ public sealed class SessionManagerTests
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession? registered));
Assert.Same(session, registered);
@@ -34,6 +34,36 @@ public sealed class SessionManagerTests
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
}
/// <summary>Verifies that a session opened by an authenticated caller records that caller's API key id in OwnerKeyId.</summary>
[Fact]
public async Task OpenSessionAsync_WithOwnerKeyId_RecordsOwnerKeyIdOnSession()
{
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
clientIdentity: "MyKey Display",
ownerKeyId: "key-abc123",
CancellationToken.None);
Assert.Equal("key-abc123", session.OwnerKeyId);
}
/// <summary>Verifies that a session opened without an owner key id records null in OwnerKeyId.</summary>
[Fact]
public async Task OpenSessionAsync_WithNullOwnerKeyId_RecordsNullOwnerKeyIdOnSession()
{
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
clientIdentity: null,
ownerKeyId: null,
CancellationToken.None);
Assert.Null(session.OwnerKeyId);
}
/// <summary>Verifies that opening a session sets the initial lease expiry from the configured default lease.</summary>
[Fact]
public async Task OpenSessionAsync_SetsInitialDefaultLease()
@@ -45,7 +75,7 @@ public sealed class SessionManagerTests
options: options,
timeProvider: clock);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal(clock.GetUtcNow() + TimeSpan.FromMinutes(30), session.LeaseExpiresAt);
}
@@ -61,7 +91,7 @@ public sealed class SessionManagerTests
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal($"rust-load-client-{session.SessionId}", session.ClientCorrelationId);
}
@@ -76,7 +106,7 @@ public sealed class SessionManagerTests
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient()));
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(request, "client-1", ownerKeyId: null, CancellationToken.None);
Assert.Equal($"client-{session.SessionId}", session.ClientCorrelationId);
}
@@ -87,7 +117,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
WorkerCommandReply reply = await manager.InvokeAsync(
session.SessionId,
@@ -108,6 +138,7 @@ public sealed class SessionManagerTests
"mxaccess-gateway-1-session-lease-refresh",
"nonce",
"client-1",
null,
"test-session",
"client-correlation-1",
TimeSpan.FromSeconds(30),
@@ -156,7 +187,7 @@ public sealed class SessionManagerTests
},
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
IReadOnlyList<SubscribeResult> results = await session.SubscribeBulkAsync(
12,
@@ -207,7 +238,7 @@ public sealed class SessionManagerTests
},
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
IReadOnlyList<BulkWriteResult> results = await session.WriteBulkAsync(
12,
@@ -268,7 +299,7 @@ public sealed class SessionManagerTests
},
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
IReadOnlyList<BulkReadResult> results = await session.ReadBulkAsync(
12,
@@ -291,7 +322,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
session.MarkFaulted("test fault");
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
@@ -316,7 +347,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
// Force a state mismatch: session stays Ready, worker transitions out.
workerClient.State = WorkerClientState.Handshaking;
@@ -341,7 +372,7 @@ public sealed class SessionManagerTests
FakeWorkerClient workerClient = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
SessionManagerException secondClose = await Assert.ThrowsAsync<SessionManagerException>(
@@ -366,7 +397,7 @@ public sealed class SessionManagerTests
"Worker shutdown timed out."),
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None));
@@ -397,6 +428,7 @@ public sealed class SessionManagerTests
GatewaySession firstSession = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
ownerKeyId: null,
CancellationToken.None);
metrics.EventReceived(firstSession.SessionId, MxEventFamily.OnDataChange.ToString());
@@ -405,6 +437,7 @@ public sealed class SessionManagerTests
GatewaySession secondSession = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-2",
ownerKeyId: null,
CancellationToken.None);
Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode);
@@ -440,6 +473,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
ownerKeyId: null,
CancellationToken.None);
Task<SessionCloseResult> firstClose = manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
@@ -482,7 +516,7 @@ public sealed class SessionManagerTests
FakeWorkerClient workerClient = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
SessionCloseResult result = await manager.KillWorkerAsync(session.SessionId, "test-kill", CancellationToken.None);
@@ -510,7 +544,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
await Assert.ThrowsAsync<ArgumentException>(
async () => await manager.KillWorkerAsync(session.SessionId, blankReason, CancellationToken.None));
@@ -529,7 +563,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
await Assert.ThrowsAsync<ArgumentNullException>(
async () => await manager.KillWorkerAsync(session.SessionId, null!, CancellationToken.None));
@@ -569,6 +603,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
ownerKeyId: null,
CancellationToken.None);
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
@@ -598,6 +633,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
ownerKeyId: null,
CancellationToken.None);
Task<SessionCloseResult> first = manager.KillWorkerAsync(session.SessionId, "kill-a", CancellationToken.None);
@@ -641,6 +677,7 @@ public sealed class SessionManagerTests
GatewaySession session = await manager.OpenSessionAsync(
CreateOpenRequest(),
"client-1",
ownerKeyId: null,
CancellationToken.None);
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
@@ -666,7 +703,7 @@ public sealed class SessionManagerTests
metrics);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None));
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode);
Assert.Equal(0, registry.Count);
@@ -682,8 +719,8 @@ public sealed class SessionManagerTests
FakeWorkerClient activeClient = new();
QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient);
SessionManager manager = CreateManager(factory);
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", ownerKeyId: null, CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
expiredSession.ExtendLease(now.AddSeconds(-1));
activeSession.ExtendLease(now.AddMinutes(5));
@@ -703,7 +740,7 @@ public sealed class SessionManagerTests
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
session.ExtendLease(now.AddSeconds(-1));
using IDisposable eventSubscriber = session.AttachEventSubscriber(allowMultipleSubscribers: false);
@@ -724,8 +761,8 @@ public sealed class SessionManagerTests
QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient);
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", ownerKeyId: null, CancellationToken.None);
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", ownerKeyId: null, CancellationToken.None);
await manager.ShutdownAsync(CancellationToken.None);
@@ -416,6 +416,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken)
{
OpenSessionCount++;