diff --git a/code-reviews/Server/findings.md b/code-reviews/Server/findings.md index e7b4617..52c6b52 100644 --- a/code-reviews/Server/findings.md +++ b/code-reviews/Server/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-24 | | Commit reviewed | `d692232` | | Status | Reviewed | -| Open findings | 8 | +| Open findings | 0 | ## Checklist coverage @@ -556,8 +556,8 @@ contention nor the bounded `_events` channel saw any changes in this wave. |---|---| | Severity | Medium | | Category | Concurrency & thread safety | -| Location | `src/MxGateway.Server/Workers/WorkerClient.cs:392-422` (gateway-side heartbeat watchdog); `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:588-617` (worker-side heartbeat loop); `src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs:14,67-76` (shared `_writeLock`) | -| Status | Open | +| Location | `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClient.cs:392-443` (gateway-side heartbeat watchdog); `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClientOptions.cs:14-67` (new `HeartbeatStuckCeiling` option) | +| Status | Resolved | **Description:** Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The .NET phase succeeded through `open-session`/`register`/`bulk-subscribe`/`bulk-read`/`bulk-unsubscribe`/`stream-events`/`write` but then failed on its third `advise` call with the Server-030 diagnostic `Session ... is not ready. Session state is Ready; worker state is Faulted.` The gateway stdout log records the underlying cause: **`Worker client faulted for session session-01a1a07fa59c489983a719821fa46e72: Worker heartbeat expired. Last heartbeat was at 2026-05-20T17:20:39.+00:00.`** — a real 15s+ gap with no `WorkerHeartbeat` envelope arriving from the worker. @@ -579,7 +579,7 @@ The .NET test pattern stresses the issue uniquely because each `dotnet run --pro Add a regression test that floods the worker's outbound event channel (e.g., via a high-rate STA fixture or a mock event source emitting at > 1000 events/s for several seconds) and asserts the worker is not faulted while the gateway has no `StreamEvents` consumer attached. -**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_ +**Resolution:** 2026-05-24 — Re-triaged at HEAD `d2d2e5f`: the gateway-side "skip-while-command-in-flight" guard (recommendation #2) is already implemented and verified against source. `WorkerClient.HeartbeatLoopAsync` (`src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClient.cs:403-443`) now skips the `HeartbeatExpired` fault while `TryGetOldestPendingCommandAge` reports an in-flight command younger than `WorkerClientOptions.HeartbeatStuckCeiling` (default 75s = 5× `HeartbeatGrace`). Once the oldest pending command exceeds the ceiling the watchdog fires anyway, so a truly stuck COM call doesn't hide the worker forever. The new `HeartbeatStuckCeiling` option is documented inline with a back-reference to Worker-023, the worker-side mirror. Regression tests in `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs`: `HeartbeatMonitor_WhenCommandInFlightWithinCeiling_DoesNotFaultOnExpiredHeartbeat` (the named scenario — parks an unanswered `InvokeAsync` past `HeartbeatGrace` but well within `HeartbeatStuckCeiling` and asserts the client stays `Ready`) and `HeartbeatMonitor_WhenPendingCommandExceedsStuckCeiling_FaultsClient` (advances past the ceiling and asserts the watchdog still fires). Recommendation #1 (decoupling the worker-side `_writeLock`) is the worker module's concern and is tracked by Worker-017 / Worker-023 — out of scope for the Server module here. ### Server-032 @@ -587,8 +587,8 @@ Add a regression test that floods the worker's outbound event channel (e.g., via |---|---| | Severity | Medium | | Category | Error handling & resilience | -| Location | `src/MxGateway.Server/Workers/WorkerClient.cs:70-77,463-484` (gateway-side `_events` channel); `src/MxGateway.Server/Configuration/EventOptions.cs:8` (default capacity 10,000); `src/MxGateway.Server/Grpc/EventStreamService.cs` (consumer) | -| Status | Open | +| Location | `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClient.cs:510-569` (gateway-side `_events` channel); `src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClientOptions.cs:45-53` (`EventChannelFullModeTimeout`) | +| Status | Resolved | **Description:** Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The Java phase advised ~55 items (`item-handle 63`) before failing on the next `advise` call with the Server-030 diagnostic `Session ... is not ready. Session state is Ready; worker state is Faulted.`. The gateway stdout log records: **`Worker client faulted for session session-adfcc808da974808947e87db060c2b03: Worker event channel rejected an event.`** — the gateway-side per-session bounded event channel filled up and `Channel.Writer.TryWrite` returned `false`, triggering the fail-fast path in `EnqueueWorkerEventAsync` (`WorkerClient.cs:467-484`). @@ -612,7 +612,7 @@ The diagnostic `"Worker event channel rejected an event."` also does not name th Add a regression test that advises N items without an active `StreamEvents` consumer, lets the channel fill, and asserts the produced fault message contains the channel-depth diagnostic (#2) — gated so that #3 is not required. -**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_ +**Resolution:** 2026-05-24 — Re-triaged at HEAD `d2d2e5f`: recommendation #2 (improved diagnostic) is already implemented and verified against source. `WorkerClient.EnqueueWorkerEventAsync` (`src/ZB.MOM.WW.MxGateway.Server/Workers/WorkerClient.cs:525-569`) now (a) attempts `TryWrite` first for the fast path, (b) on full-channel falls through to `WriteAsync` with a linked `CancellationTokenSource` cancelled after `WorkerClientOptions.EventChannelFullModeTimeout` (default 5s), so a transient consumer hiccup is absorbed instead of fail-fast on the first overflow event, and (c) on real overflow records `QueueOverflow("worker-events")` and faults with the rich diagnostic message naming the wait timeout, the current channel depth, the channel capacity, and the actionable remediation ("Attach a StreamEvents consumer or raise MxGateway:Events:QueueCapacity."). Regression test: `WorkerClientTests.EnqueueWorkerEvent_WhenChannelFullPastTimeout_FaultsWithRichDiagnostic` (`src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs:473-521`) fills a 4-slot channel + one overflow, asserts the worker is faulted, then drains the propagated `WorkerClientException` and pins the diagnostic string contains "Worker event channel rejected", "of 4 capacity", "StreamEvents", and "MxGateway:Events:QueueCapacity". Recommendation #1 (the prose contract in `gateway.md` / `docs/DesignDecisions.md` / client READMEs) is out of scope for this pass — the prompt restricts edits to `src/ZB.MOM.WW.MxGateway.Server/**`, `src/ZB.MOM.WW.MxGateway.Tests/**`, and this findings file; the documentation update needs to land in a follow-up that has docs access. Recommendation #3 (overflow grace window) was already implemented in spirit by the `WriteAsync` + timeout switch — the channel now absorbs a transient burst up to the configured wait timeout, satisfying #3's "consumer hiccup resilience" goal without requiring a separate counter. ### Server-033 @@ -696,13 +696,13 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Medium | | Category | Security | | Location | `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs:23-44` | -| Status | Open | +| Status | Resolved | **Description:** `EventsHub` is gated by `[Authorize(Policy = DashboardAuthenticationDefaults.HubClientsPolicy)]`, which checks only that the caller carries a dashboard role (Admin or Viewer). `SubscribeSession(sessionId)` accepts any non-empty session id and joins the caller to `session:{id}`. A Viewer who knows or guesses a session id can therefore subscribe to any session's MxEvent stream once `DashboardEventBroadcaster` is broadcasting (which it now is, per `d692232`). The per-session ACL that gates the gRPC `StreamEvents` RPC is not replicated. **Recommendation:** Before the EventsHub is exercised by Admin-only sessions or session-scoped Viewer roles, gate `SubscribeSession` on a session-access check — either via a per-session role check in the hub method itself, or by storing a per-user allowed-session-id set in the connection's `Context.Items` at connect time and rejecting subscribes outside that set. The current dashboard surfaces only a per-page Session Details view that the page can prove it's authorized for, but as soon as a Viewer role exists the gap matters. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — Documented the v1 acceptance per the prompt's "practical fix for v1" direction. Added a detailed `` block to `EventsHub.SubscribeSession` (`src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs`) stating that (a) in v1 the hub-level `HubClientsPolicy` only requires one of the dashboard roles (Admin or Viewer) and both may subscribe to any session id, (b) this is acceptable today because the dashboard's per-session views show non-secret session metadata any authenticated user can already see and value logging is gated by the same redaction policy, and (c) the per-session ACL that gates the gRPC `StreamEvents` RPC is intentionally not yet mirrored here. Added an explicit `TODO(per-session-acl)` describing the future enforcement seam — once a role/scope is introduced that scopes a Viewer to a specific session or tenant, add a session-access check at this method (inline on `Context.User` claims/`Context.Items`, or via a dedicated authorization policy applied to the hub method). No code-behavior change in this pass; the per-session ACL data model design is out of scope for the resolution window. No new regression test (the change is documentation-only). ### Server-039 @@ -711,13 +711,13 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Low | | Category | Error handling & resilience | | Location | `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs:37-58` | -| Status | Open | +| Status | Resolved | **Description:** `HubTokenService.Validate` deserializes the protected JSON payload and trusts `payload.Roles` even when `payload.Name` and `payload.NameIdentifier` are both `null`. The resulting `ClaimsPrincipal` has the `MxGateway.Dashboard.HubToken` scheme as its `AuthenticationType` and the role claims, but no identity claims. `Identity?.IsAuthenticated` returns `true` because the auth type is non-empty, so the principal satisfies `IsAuthenticated` checks and `IsInRole` checks even though it has no caller identity. A token forged from a corrupted data-protection store could pass authorization without an associated user. **Recommendation:** Mark `HubTokenPayload.Name` and `HubTokenPayload.NameIdentifier` as required (e.g. with `[JsonRequired]` once the project standardizes the JSON binder, or by validating non-null explicitly after deserialization) and reject the token if either is missing. Alternatively, document on `IDashboardAuthorizationHandler` consumers that they must check `Identity?.Name` is non-null before honoring role claims from this scheme. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — `HubTokenService.Validate` (`src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs`) now rejects a deserialized payload where both `Name` and `NameIdentifier` are null/empty — returning `null` rather than emitting a principal with role claims but no caller identity. The check sits immediately after the protector unprotect and the null-payload guard, with a comment back-referencing Server-039. Either field is sufficient (a token minted with only a `NameIdentifier` still validates), matching the existing `Issue` path where the cookie principal may carry just one of them. New test file `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/HubTokenServiceTests.cs` with six tests: `Validate_TokenWithNullNameAndNullNameIdentifier_ReturnsNull` (the named regression — confirmed to fail before the fix and pass after), `Validate_TokenWithName_ReturnsAuthenticatedPrincipal`, `Validate_TokenWithOnlyNameIdentifier_ReturnsPrincipal`, plus null/empty/garbage-token sanity checks. Verified by passing tests. ### Server-040 @@ -726,13 +726,13 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Low | | Category | Code organization & conventions | | Location | `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs:140-160` (`MapGroupsToRoles`) | -| Status | Open | +| Status | Resolved | **Description:** `MapGroupsToRoles` checks each LDAP group against the role map twice — first by the full group string, then by `ExtractFirstRdnValue(group)` — and `TryGetValue` short-circuits on the first hit. The precedence ("full match wins over RDN match") is correct because the map's key set is operator-controlled and matches should resolve deterministically, but the lookup ordering is not documented. A future maintainer reading the code can't tell whether "fall through to RDN" is intentional or a leftover from refactoring `IsMemberOfRequiredGroup`. **Recommendation:** Add a one-line comment above the loop explaining the precedence: full DN/CN literal first, leading-RDN fallback second. Mention the case-insensitive map comparer (`OrdinalIgnoreCase`) so the next reader doesn't ask why `"GwAdmin"` matches `"gwadmin"`. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — Added a precedence comment block above the lookup in `MapGroupsToRoles` (`src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs:156-163`) explaining that the full literal group string is tried first and the leading-RDN value (e.g. `GwAdmin` extracted from `ou=GwAdmin,ou=groups,...`) is the fallback, and back-referencing `DashboardOptions.GroupToRole` as the source of the `OrdinalIgnoreCase` comparer so a maintainer sees why `"GwAdmin"` matches `"gwadmin"`. No code change — existing `DashboardAuthenticatorTests.MapGroupsToRoles_ResolvesByShortNameAndDistinguishedName` already pins both the full-match and RDN-fallback paths and the case-insensitive lookup; pure documentation-only resolution, no new test. ### Server-041 @@ -741,13 +741,13 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Low | | Category | Design-document adherence | | Location | `src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:123-126`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/IDashboardEventBroadcaster.cs:6-10` | -| Status | Open | +| Status | Resolved | **Description:** `IDashboardEventBroadcaster.Publish` is documented as "Implementations must never throw — broadcast failures are best-effort and must not disrupt the source gRPC stream." `EventStreamService` honors that contract by passing the call through without a try/catch. The current `DashboardEventBroadcaster` implementation observes the `SendAsync` task's continuation but does not raise synchronously, so the seam is safe today. A future implementation that adds synchronous validation or a serializer hop could throw, faulting the producer loop and ending the gRPC stream. **Recommendation:** Either wrap the `Publish` call in a `try/catch (Exception ex)` that logs at debug and continues (matching the `DashboardSnapshotPublisher` pattern), or add a code-review checklist note enforcing the never-throw contract on implementations. The wrap is safer because it doesn't depend on convention. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — Took the safer wrap. `EventStreamService.ProduceEventsAsync` (`src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs:123-137`) now wraps the `dashboardEventBroadcaster.Publish(...)` call in a `try / catch (Exception ex)` that logs at debug and continues. The producer loop and the gRPC stream are no longer at the mercy of the broadcaster's never-throw discipline — a future implementation that adds synchronous validation or a serializer hop cannot fault the stream. Regression test in `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs`: `StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession` (new) injects a `ThrowingDashboardEventBroadcaster` test double that throws `InvalidOperationException` on every `Publish`, then asserts (a) the gRPC stream still yields both events in order, (b) the broadcaster's `Publish` is attempted for every event (so the catch is exercised per-event rather than aborting the loop), and (c) the session does not transition to `Faulted`. Confirmed to fail before the fix (the producer loop surfaced the simulated `InvalidOperationException`) and pass after. Verified by passing tests. ### Server-042 @@ -756,13 +756,13 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Low | | Category | Performance & resource management | | Location | `src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs:18-41` | -| Status | Open | +| Status | Resolved | **Description:** `DashboardSnapshotPublisher.ExecuteAsync` reads from `IDashboardSnapshotService.WatchSnapshotsAsync` inside an outer `try` that catches `OperationCanceledException` only. A failure inside `WatchSnapshotsAsync` (e.g. the snapshot service throws after a transient SQL failure for the Galaxy summary projection) escapes the outer try and ends the BackgroundService — no automatic reconnect. The sibling `AlarmsHubPublisher` (lines 55-61) wraps its `StreamAsync` consumer in a 5-second reconnect loop with `catch (Exception ex)` and continues. The snapshot publisher should follow the same shape. **Recommendation:** Wrap the `await foreach` in a `while (!stoppingToken.IsCancellationRequested)` loop with a `catch (Exception ex)` plus a 5-second `Task.Delay`, mirroring `AlarmsHubPublisher`. Today's snapshot service rarely throws on the watch path, but a one-time logger-init failure or transient `IGatewayConfigurationProvider` exception would silently take the dashboard offline. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — Mirrored `AlarmsHubPublisher`'s reconnect loop. `DashboardSnapshotPublisher.ExecuteAsync` (`src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs`) now wraps the `await foreach` in a `while (!stoppingToken.IsCancellationRequested)` loop and catches general exceptions with a logged warning + `Task.Delay(reconnectDelay, stoppingToken)`. The 5-second `DefaultReconnectDelay` is preserved for production via the public constructor; an `internal` overload injects a shorter delay for the regression test (with `[InternalsVisibleTo("ZB.MOM.WW.MxGateway.Tests")]` already in place). Also tightened cancellation handling: the inner `OperationCanceledException` `return`s instead of merely catching, so a normal shutdown exits cleanly rather than re-looping on the cancelled token. New test file `src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs` with two cases: `ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay` (the named regression — `ThrowOnceThenYieldSnapshotService` throws on the first `WatchSnapshotsAsync` call and yields a snapshot on the second; the publisher must reconnect, broadcast the snapshot, and the gap between throw and reconnect must respect the configured delay) and `ExecuteAsync_WhenSnapshotServiceCompletes_ReconnectsAfterDelay` (sanity case: a normal `yield break` also triggers the reconnect loop). Confirmed both tests fail against the original single-try implementation (the BackgroundService exits and `SubscribeCount` stays at 1) and pass after the fix. Verified by passing tests. ### Server-043 @@ -771,10 +771,10 @@ Add a regression test that advises N items without an active `StreamEvents` cons | Severity | Low | | Category | Documentation & comments | | Location | `src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs:1`, `src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs:24` | -| Status | Open | +| Status | Resolved | **Description:** `HubTokenService` is registered as a singleton (good — data protection providers are thread-safe and a single protector instance is correct) and shared by both `DashboardHubConnectionFactory` (per-circuit scoped, mints fresh tokens from the cookie principal) and `HubTokenAuthenticationHandler` (per-request transient, validates inbound tokens). The class-level docs describe what the service does but not that it is intentionally a singleton with two consumer scopes, so a future maintainer rewriting the DI registration may pick the wrong lifetime. **Recommendation:** Add a `` block to `HubTokenService` noting "Registered as a singleton in `AddGatewayDashboard`; the underlying `ITimeLimitedDataProtector` is thread-safe and shared across hub-token issuance and validation." Optionally add a comment near the DI registration explaining the lifetime contract. -**Resolution:** _(empty until closed)_ +**Resolution:** 2026-05-24 — Added a `` block to `HubTokenService` (`src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs`) documenting that the service is registered as a singleton in `DashboardServiceCollectionExtensions.AddGatewayDashboard` and is shared by two consumer scopes — `DashboardHubConnectionFactory` (scoped, per-circuit; calls `Issue` from the cookie-authenticated dashboard) and `HubTokenAuthenticationHandler` (transient, per-request; calls `Validate` from the SignalR negotiate / connection path). Notes that the underlying `ITimeLimitedDataProtector` is thread-safe so concurrent mint/validate from any number of callers is safe, and explicitly asks future maintainers to preserve the singleton lifetime to keep the protector instance stable. Pure documentation change; no test. diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs index 09b9fc5..473ec05 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/DashboardAuthenticator.cs @@ -154,6 +154,13 @@ public sealed class DashboardAuthenticator( foreach (string group in groups) { string normalizedGroup = group.Trim(); + + // Lookup precedence (Server-040): the full literal group string is + // tried first; only if that misses do we fall back to the leading + // RDN value (e.g. "GwAdmin" extracted from + // "ou=GwAdmin,ou=groups,..."). The map's comparer is + // OrdinalIgnoreCase (see DashboardOptions.GroupToRole), so e.g. + // "GwAdmin" and "gwadmin" both match. if (groupToRole.TryGetValue(normalizedGroup, out string? mapped) || groupToRole.TryGetValue(ExtractFirstRdnValue(normalizedGroup), out mapped)) { diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs index 8795d76..d5c1254 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/HubTokenService.cs @@ -11,6 +11,18 @@ namespace ZB.MOM.WW.MxGateway.Server.Dashboard; /// role claims. Validity is enforced by the data-protection time-limited /// protector; no separate signing keys are configured. /// +/// +/// Server-043: this service is registered as a singleton in +/// and +/// is shared by two consumer scopes: DashboardHubConnectionFactory +/// (scoped, per-circuit; calls from the cookie-authenticated +/// dashboard) and HubTokenAuthenticationHandler (transient, per-request; +/// calls from the SignalR negotiate / connection path). +/// The underlying is thread-safe, so +/// minting and validating concurrently from any number of callers is safe; +/// future maintainers should preserve the singleton lifetime to keep the +/// protector instance stable. +/// public sealed class HubTokenService { private const string ProtectorPurpose = "ZB.MOM.WW.MxGateway.Dashboard.HubToken.v1"; @@ -51,6 +63,16 @@ public sealed class HubTokenService return null; } + // Server-039: reject a token whose payload carries no caller + // identity. A null/empty Name AND NameIdentifier would otherwise + // produce a principal that satisfies IsAuthenticated and IsInRole + // checks without any associated user, because the AuthenticationType + // (the HubToken scheme) is non-empty. + if (string.IsNullOrEmpty(payload.Name) && string.IsNullOrEmpty(payload.NameIdentifier)) + { + return null; + } + List claims = []; if (!string.IsNullOrEmpty(payload.Name)) { diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs index 9f74e91..d485b4f 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/DashboardSnapshotPublisher.cs @@ -8,34 +8,94 @@ namespace ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; /// client. There is one publisher per /// gateway process; clients listen via the hub. /// -public sealed class DashboardSnapshotPublisher( - IDashboardSnapshotService snapshotService, - IHubContext hubContext, - ILogger logger) : BackgroundService +/// +/// Server-042: wraps the snapshot subscription in +/// a reconnect loop with a configurable retry delay (5s by default, +/// mirroring ). A transient failure inside +/// — e.g. a +/// one-time logger-init failure or a transient SQL error from the Galaxy +/// summary projection — would otherwise end the BackgroundService with no +/// reconnect, taking the dashboard offline until process restart. +/// +public sealed class DashboardSnapshotPublisher : BackgroundService { + private static readonly TimeSpan DefaultReconnectDelay = TimeSpan.FromSeconds(5); + + private readonly IDashboardSnapshotService _snapshotService; + private readonly IHubContext _hubContext; + private readonly ILogger _logger; + private readonly TimeSpan _reconnectDelay; + + public DashboardSnapshotPublisher( + IDashboardSnapshotService snapshotService, + IHubContext hubContext, + ILogger logger) + : this(snapshotService, hubContext, logger, DefaultReconnectDelay) + { + } + + // Internal hook for the Server-042 regression test: tests inject a + // very short reconnect delay so the assertion doesn't wait the full + // 5s. Production wiring always uses the 5s default via the public ctor. + internal DashboardSnapshotPublisher( + IDashboardSnapshotService snapshotService, + IHubContext hubContext, + ILogger logger, + TimeSpan reconnectDelay) + { + _snapshotService = snapshotService; + _hubContext = hubContext; + _logger = logger; + _reconnectDelay = reconnectDelay; + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - try + // Loop forever — when WatchSnapshotsAsync completes or throws, reopen + // the subscription after a short delay. The hosted-service lifetime + // ends only when the host stops. Mirrors AlarmsHubPublisher. + while (!stoppingToken.IsCancellationRequested) { - await foreach (DashboardSnapshot snapshot in snapshotService - .WatchSnapshotsAsync(stoppingToken) - .ConfigureAwait(false)) + try { + await foreach (DashboardSnapshot snapshot in _snapshotService + .WatchSnapshotsAsync(stoppingToken) + .ConfigureAwait(false)) + { + if (stoppingToken.IsCancellationRequested) + { + break; + } + + try + { + await _hubContext.Clients + .All + .SendAsync(DashboardSnapshotHub.SnapshotMessage, snapshot, stoppingToken) + .ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, "Snapshot broadcast failed; will retry on the next snapshot tick."); + } + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Snapshot subscription faulted; reconnecting in {DelaySeconds:F1}s.", _reconnectDelay.TotalSeconds); try { - await hubContext.Clients - .All - .SendAsync(DashboardSnapshotHub.SnapshotMessage, snapshot, stoppingToken) - .ConfigureAwait(false); + await Task.Delay(_reconnectDelay, stoppingToken).ConfigureAwait(false); } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (OperationCanceledException) { - logger.LogWarning(ex, "Snapshot broadcast failed; will retry on the next snapshot tick."); + return; } } } - catch (OperationCanceledException) - { - } } } diff --git a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs index 7ad14b8..9b327c8 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Dashboard/Hubs/EventsHub.cs @@ -23,6 +23,29 @@ public sealed class EventsHub : Hub public static string GroupName(string sessionId) => $"session:{sessionId}"; + /// + /// Subscribes the calling SignalR connection to the per-session events + /// group, so that events broadcast by + /// for that session reach this + /// client. + /// + /// + /// Server-038: in v1 the hub-level + /// (HubClientsPolicy) only checks that the caller carries one of + /// the dashboard roles (Admin or Viewer); both roles may subscribe to + /// any session id they choose. This is acceptable today because (a) the + /// dashboard's per-session views show non-secret session metadata that + /// any authenticated dashboard user can already see, and (b) value + /// logging in the source gRPC stream is gated by the same redaction + /// policy that protects logs. The per-session ACL that gates the gRPC + /// StreamEvents RPC is intentionally not yet mirrored here. + /// TODO(per-session-acl): once a role/scope is introduced that scopes a + /// Viewer to a specific session or tenant, add a session-access check + /// at this seam — either inline (consult the per-user allowed-session + /// set on Context.User claims / Context.Items) or via a + /// dedicated authorization policy applied to the hub method itself. + /// + /// Session id to subscribe the caller to. public Task SubscribeSession(string sessionId) { if (string.IsNullOrWhiteSpace(sessionId)) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index 7147474..a519f58 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -122,8 +122,23 @@ public sealed class EventStreamService( // Mirror the event to the dashboard EventsHub group for this // session. Fire-and-forget — broadcast errors must not affect - // the source gRPC stream. - dashboardEventBroadcaster.Publish(session.SessionId, publicEvent); + // the source gRPC stream. Server-041: the + // IDashboardEventBroadcaster contract documents Publish as + // never-throw, but we enforce that at the seam too, so a + // future implementation that adds synchronous validation or + // a serializer hop cannot fault the producer loop and end + // this client's gRPC stream. + try + { + dashboardEventBroadcaster.Publish(session.SessionId, publicEvent); + } + catch (Exception ex) + { + logger.LogDebug( + ex, + "Dashboard event mirror threw for session {SessionId}; continuing.", + session.SessionId); + } if (!writer.TryWrite(publicEvent)) { diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs new file mode 100644 index 0000000..393be5c --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotPublisherTests.cs @@ -0,0 +1,206 @@ +using System.Runtime.CompilerServices; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.MxGateway.Server.Dashboard; +using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard; + +public sealed class DashboardSnapshotPublisherTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + + /// + /// Server-042 regression: a transient failure inside + /// must not + /// end the BackgroundService; the publisher must wait the configured + /// reconnect delay and then re-open the subscription. Before the fix, + /// the publisher exited on the first non-cancellation exception and + /// the dashboard's snapshot stream went silent until process restart. + /// + [Fact] + public async Task ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay() + { + ThrowOnceThenYieldSnapshotService snapshotService = new(); + RecordingHubContext hubContext = new(); + TimeSpan reconnectDelay = TimeSpan.FromMilliseconds(50); + DashboardSnapshotPublisher publisher = new( + snapshotService, + hubContext, + NullLogger.Instance, + reconnectDelay); + + using CancellationTokenSource cts = new(); + DateTimeOffset startedAt = DateTimeOffset.UtcNow; + Task execute = publisher.StartAsync(cts.Token); + await execute.WaitAsync(TestTimeout); + + // The publisher's first WatchSnapshotsAsync call throws; the second + // call yields one snapshot. We block here until the publisher has + // made the second subscribe attempt AND broadcast its first + // snapshot — proving the publisher did NOT exit on the throw. + await WaitUntilAsync(() => snapshotService.SubscribeCount >= 2); + await WaitUntilAsync(() => hubContext.SendCount >= 1); + + DateTimeOffset secondSubscribeAt = snapshotService.SecondSubscribeAt + ?? throw new InvalidOperationException("Second subscribe did not record a timestamp."); + + await cts.CancelAsync(); + await publisher.StopAsync(CancellationToken.None); + + Assert.True(snapshotService.SubscribeCount >= 2, + $"Expected at least 2 subscribe calls, got {snapshotService.SubscribeCount}."); + Assert.True(hubContext.SendCount >= 1); + + // The gap between the throw (first subscribe) and the reconnect + // (second subscribe) is bounded below by the reconnect delay. We + // give a small slack (10ms) for scheduling jitter on slow CI VMs. + TimeSpan gap = secondSubscribeAt - startedAt; + Assert.True(gap >= reconnectDelay - TimeSpan.FromMilliseconds(10), + $"Expected reconnect gap >= {reconnectDelay.TotalMilliseconds}ms; got {gap.TotalMilliseconds}ms."); + } + + /// + /// Sanity: a normal completion of WatchSnapshotsAsync (no exception) + /// also reconnects after the delay — exits only on host shutdown. + /// + [Fact] + public async Task ExecuteAsync_WhenSnapshotServiceCompletes_ReconnectsAfterDelay() + { + CompleteImmediatelySnapshotService snapshotService = new(); + RecordingHubContext hubContext = new(); + TimeSpan reconnectDelay = TimeSpan.FromMilliseconds(50); + DashboardSnapshotPublisher publisher = new( + snapshotService, + hubContext, + NullLogger.Instance, + reconnectDelay); + + using CancellationTokenSource cts = new(); + Task execute = publisher.StartAsync(cts.Token); + await execute.WaitAsync(TestTimeout); + + await WaitUntilAsync(() => snapshotService.SubscribeCount >= 2); + + await cts.CancelAsync(); + await publisher.StopAsync(CancellationToken.None); + + Assert.True(snapshotService.SubscribeCount >= 2); + } + + private static async Task WaitUntilAsync(Func predicate) + { + using CancellationTokenSource cancellationTokenSource = new(TestTimeout); + while (!predicate()) + { + await Task.Delay(TimeSpan.FromMilliseconds(5), cancellationTokenSource.Token); + } + } + + private sealed class ThrowOnceThenYieldSnapshotService : IDashboardSnapshotService + { + public int SubscribeCount { get; private set; } + public DateTimeOffset? SecondSubscribeAt { get; private set; } + + public DashboardSnapshot GetSnapshot() + { + return null!; + } + + public async IAsyncEnumerable WatchSnapshotsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + SubscribeCount++; + int call = SubscribeCount; + + if (call == 1) + { + // First call: throw after a brief yield so the publisher + // observes us as a live producer that failed. + await Task.Yield(); + throw new InvalidOperationException("simulated transient snapshot failure"); + } + + SecondSubscribeAt = DateTimeOffset.UtcNow; + yield return GetSnapshot(); + + // Stay open until cancelled so the publisher's inner await + // foreach doesn't immediately re-loop. + try + { + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + } + + private sealed class CompleteImmediatelySnapshotService : IDashboardSnapshotService + { + public int SubscribeCount { get; private set; } + + public DashboardSnapshot GetSnapshot() + { + return null!; + } + +#pragma warning disable CS1998 // async without await — IAsyncEnumerable contract requires async signature + public async IAsyncEnumerable WatchSnapshotsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) +#pragma warning restore CS1998 + { + SubscribeCount++; + // Yield nothing and complete immediately — simulates a transient + // upstream disconnect that completes cleanly. + yield break; + } + } + + private sealed class RecordingHubContext : IHubContext + { + private readonly RecordingHubClients _clients = new(); + + public IHubClients Clients => _clients; + public IGroupManager Groups { get; } = new NoopGroupManager(); + + public int SendCount => _clients.AllProxy.SendCount; + } + + private sealed class RecordingHubClients : IHubClients + { + public RecordingClientProxy AllProxy { get; } = new(); + + public IClientProxy All => AllProxy; + public IClientProxy AllExcept(IReadOnlyList excludedConnectionIds) => AllProxy; + public IClientProxy Client(string connectionId) => AllProxy; + public IClientProxy Clients(IReadOnlyList connectionIds) => AllProxy; + public IClientProxy Group(string groupName) => AllProxy; + public IClientProxy GroupExcept(string groupName, IReadOnlyList excludedConnectionIds) => AllProxy; + public IClientProxy Groups(IReadOnlyList groupNames) => AllProxy; + public IClientProxy User(string userId) => AllProxy; + public IClientProxy Users(IReadOnlyList userIds) => AllProxy; + } + + private sealed class RecordingClientProxy : IClientProxy + { + private int _sendCount; + + public int SendCount => Volatile.Read(ref _sendCount); + + public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref _sendCount); + return Task.CompletedTask; + } + } + + private sealed class NoopGroupManager : IGroupManager + { + public Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) + => Task.CompletedTask; + + public Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) + => Task.CompletedTask; + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/HubTokenServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/HubTokenServiceTests.cs new file mode 100644 index 0000000..a630ecc --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Dashboard/HubTokenServiceTests.cs @@ -0,0 +1,115 @@ +using System.Security.Claims; +using Microsoft.AspNetCore.DataProtection; +using ZB.MOM.WW.MxGateway.Server.Dashboard; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard; + +public sealed class HubTokenServiceTests +{ + /// + /// Server-039: a token whose data-protected payload has both + /// Name and NameIdentifier null (the principal that + /// minted the token had no identity claims) must be rejected by + /// . The role claims alone are + /// not enough — without a caller identity, the resulting + /// would satisfy + /// IsAuthenticated / IsInRole checks without an + /// associated user. + /// + [Fact] + public void Validate_TokenWithNullNameAndNullNameIdentifier_ReturnsNull() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + // Issue from a principal with NO Name claim and NO NameIdentifier + // claim. The Issue method's payload will then carry + // (Name = null, NameIdentifier = null, Roles = ["Viewer"]). + ClaimsIdentity identity = new( + [new Claim(ClaimTypes.Role, DashboardRoles.Viewer)], + authenticationType: "test"); + ClaimsPrincipal principal = new(identity); + string token = service.Issue(principal); + + ClaimsPrincipal? result = service.Validate(token); + + Assert.Null(result); + } + + /// + /// Sanity check: a token minted from a principal with a Name claim + /// validates and returns a principal carrying that identity. Pins + /// that the Server-039 fix does not over-reject valid tokens. + /// + [Fact] + public void Validate_TokenWithName_ReturnsAuthenticatedPrincipal() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + ClaimsIdentity identity = new( + [ + new Claim(ClaimTypes.Name, "alice"), + new Claim(ClaimTypes.NameIdentifier, "alice-id"), + new Claim(ClaimTypes.Role, DashboardRoles.Admin), + ], + authenticationType: "test", + nameType: ClaimTypes.Name, + roleType: ClaimTypes.Role); + ClaimsPrincipal principal = new(identity); + string token = service.Issue(principal); + + ClaimsPrincipal? result = service.Validate(token); + + Assert.NotNull(result); + Assert.Equal("alice", result.Identity?.Name); + Assert.True(result.IsInRole(DashboardRoles.Admin)); + } + + /// + /// Sanity check: a token minted with only a NameIdentifier (no Name) + /// still validates — a non-null caller identity is the contract, + /// either field is sufficient. + /// + [Fact] + public void Validate_TokenWithOnlyNameIdentifier_ReturnsPrincipal() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + ClaimsIdentity identity = new( + [ + new Claim(ClaimTypes.NameIdentifier, "alice-id"), + new Claim(ClaimTypes.Role, DashboardRoles.Viewer), + ], + authenticationType: "test"); + ClaimsPrincipal principal = new(identity); + string token = service.Issue(principal); + + ClaimsPrincipal? result = service.Validate(token); + + Assert.NotNull(result); + Assert.True(result.IsInRole(DashboardRoles.Viewer)); + } + + [Fact] + public void Validate_NullToken_ReturnsNull() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + Assert.Null(service.Validate(null)); + } + + [Fact] + public void Validate_EmptyToken_ReturnsNull() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + Assert.Null(service.Validate(string.Empty)); + } + + [Fact] + public void Validate_GarbageToken_ReturnsNull() + { + HubTokenService service = new(new EphemeralDataProtectionProvider()); + + Assert.Null(service.Validate("this-is-not-a-protected-payload")); + } +} diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs index 72bd4ca..fc11e9c 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -9,6 +9,7 @@ using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; +using ZB.MOM.WW.MxGateway.Tests.TestSupport; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc; @@ -260,11 +261,81 @@ public sealed class EventStreamServiceTests Assert.Equal(1, metrics.GetSnapshot().Faults); } + /// + /// Tests-026 regression: + /// must mirror every yielded event to the + /// + /// seam (the only path that fans events out to dashboard SignalR clients). + /// A regression that silently dropped the Publish call — e.g. an + /// if accidentally added around it, or the broadcaster ctor + /// parameter being removed — would have produced no failing test before + /// this fixture existed. The recording fake captures every call and we + /// assert one publish per yielded event, with the correct session id and + /// preserved WorkerSequence. + /// + [Fact] + public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + RecordingDashboardEventBroadcaster recordingBroadcaster = new(); + EventStreamService service = CreateService( + new FakeSessionManager(session), + dashboardEventBroadcaster: recordingBroadcaster); + workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete)); + workerClient.CompleteAfterConfiguredEvents = true; + + List events = await CollectEventsAsync(service, session.SessionId); + + Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); + IReadOnlyList captures = recordingBroadcaster.Captures; + Assert.Equal(2, captures.Count); + Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId)); + Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); + Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family); + Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family); + } + + /// + /// Server-041 regression: must not + /// abort the gRPC stream when the dashboard broadcaster throws. + /// IDashboardEventBroadcaster.Publish is documented as + /// best-effort and never-throw, but the gRPC consumer cannot rely on + /// implementation discipline alone — the seam itself swallows the + /// fault and logs at debug, mirroring the broadcaster's own + /// continuation handler. Without the wrap, the producer loop would + /// surface the exception and the client would see a faulted stream + /// for a dashboard-mirror failure. + /// + [Fact] + public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession() + { + FakeWorkerClient workerClient = new(); + GatewaySession session = CreateReadySession(workerClient); + using GatewayMetrics metrics = new(); + ThrowingDashboardEventBroadcaster throwingBroadcaster = new(); + EventStreamService service = CreateService( + new FakeSessionManager(session), + metrics, + dashboardEventBroadcaster: throwingBroadcaster); + workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange)); + workerClient.CompleteAfterConfiguredEvents = true; + + List events = await CollectEventsAsync(service, session.SessionId); + + Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); + Assert.Equal(2, throwingBroadcaster.PublishAttempts); + Assert.NotEqual(SessionState.Faulted, session.State); + } + private static EventStreamService CreateService( FakeSessionManager sessionManager, GatewayMetrics? metrics = null, int queueCapacity = 8, - EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast) + EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast, + ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null) { return new EventStreamService( sessionManager, @@ -278,14 +349,19 @@ public sealed class EventStreamServiceTests }), new MxAccessGrpcMapper(), metrics ?? new GatewayMetrics(), - NullDashboardEventBroadcaster.Instance, + dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance, NullLogger.Instance); } - private sealed class NullDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster + private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster { - public static readonly NullDashboardEventBroadcaster Instance = new(); - public void Publish(string sessionId, MxEvent mxEvent) { } + public int PublishAttempts { get; private set; } + + public void Publish(string sessionId, MxEvent mxEvent) + { + PublishAttempts++; + throw new InvalidOperationException("simulated dashboard broadcaster failure"); + } } private static async Task> CollectEventsAsync(