diff --git a/docs/GatewayDashboardDesign.md b/docs/GatewayDashboardDesign.md index 1d442f3..d7e6baf 100644 --- a/docs/GatewayDashboardDesign.md +++ b/docs/GatewayDashboardDesign.md @@ -167,7 +167,7 @@ bearer). Each hub class is `[Authorize(Policy = HubClientsPolicy)]`. |---|---|---|---|---| | `DashboardSnapshotHub` | `/hubs/snapshot` | `DashboardSnapshotPublisher` (BackgroundService consuming `IDashboardSnapshotService.WatchSnapshotsAsync`) | `DashboardSnapshot` | Sent to all connected clients on every snapshot tick; new connections receive the current snapshot synchronously in `OnConnectedAsync`. | | `AlarmsHub` | `/hubs/alarms` | `AlarmsHubPublisher` (BackgroundService consuming `IGatewayAlarmService.StreamAsync(filter: null)`) | `AlarmFeedMessage` (`active_alarm` / `snapshot_complete` / `transition`) | Connected clients auto-join `__alarms__`; all clients receive every message. Publisher auto-reconnects every 5s on stream faults. | -| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by `EventStreamService` for each event it forwards to a gRPC client | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. Events appear only while a gRPC client is also consuming that session's events — the dashboard is a passive mirror, not a separate worker subscriber. | +| `EventsHub` | `/hubs/events` | `DashboardEventBroadcaster` invoked by each session's internal dashboard-mirror subscriber on its `SessionEventDistributor` (registered when the session becomes Ready) | `MxEvent` | Clients call `SubscribeSession(sessionId)` to join `session:{id}`. The dashboard is a first-class distributor subscriber, so it receives the session's events whether or not a gRPC client is streaming. It sees RAW session events — not the per-gRPC-subscriber `AfterWorkerSequence` filtering that `EventStreamService` applies at its own boundary — because the dashboard is a separate LDAP-authenticated monitoring view meant to show the session's full event activity (per-session dashboard ACL is tracked separately). | `DashboardPageBase` opens a `DashboardSnapshotHub` connection via the connection factory in `OnInitializedAsync`, seeds `Snapshot` synchronously from @@ -184,7 +184,8 @@ Default cadences: - snapshot service produces one snapshot per `MxGateway:Dashboard:SnapshotIntervalMilliseconds` (default 1s); - alarm publisher emits on each transition observed by the central monitor; -- event publisher emits per event forwarded by `StreamEvents`. +- event publisher emits per event fanned by the session's `SessionEventDistributor` + to its internal dashboard-mirror subscriber (independent of any gRPC `StreamEvents`). Avoid pushing every MXAccess data-change event into a wider broadcast group. The current design routes events strictly through `session:{id}` groups; the diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index 85581f2..6dfab3f 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -2,7 +2,6 @@ using System.Runtime.CompilerServices; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; -using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; @@ -12,9 +11,7 @@ namespace ZB.MOM.WW.MxGateway.Server.Grpc; public sealed class EventStreamService( ISessionManager sessionManager, IOptions options, - GatewayMetrics metrics, - IDashboardEventBroadcaster dashboardEventBroadcaster, - ILogger logger) : IEventStreamService + GatewayMetrics metrics) : IEventStreamService { /// /// Streams events from a session to the client asynchronously. @@ -26,8 +23,19 @@ public sealed class EventStreamService( /// pump. The pump owns the single drain of /// the worker event stream and the worker→public mapping (mirroring the former /// ProduceEventsAsync); this loop is the per-subscriber boundary that - /// applies the per-RPC filter (AfterWorkerSequence), the dashboard mirror, - /// queue-depth metrics, and the backpressure/overflow policy. + /// applies the per-RPC filter (AfterWorkerSequence), queue-depth metrics, + /// and the backpressure/overflow policy. + /// + /// + /// Task 6 moved the dashboard mirror OFF this per-RPC loop. The dashboard is now a + /// first-class internal subscriber on the session's + /// (see GatewaySession.StartDashboardMirror), + /// so it receives session events even when no gRPC client is streaming. This loop no + /// longer mirrors to the dashboard. One deliberate consequence: the dashboard now sees + /// RAW session events, not the per-gRPC-subscriber AfterWorkerSequence-filtered + /// view this loop applies — the dashboard is a separate LDAP-authenticated monitoring + /// view that should see the session's full event activity (per-session dashboard ACL is + /// the separate Task 18). /// /// /// Overflow handling (Task 5): the distributor's per-subscriber channel is bounded @@ -97,34 +105,12 @@ public sealed class EventStreamService( // Per-RPC filter stays at the subscriber boundary: each request may resume // from a different AfterWorkerSequence, so the shared pump fans raw events and - // this loop drops the ones at or below the caller's watermark. Filtered events - // are not mirrored to the dashboard, matching the pre-Task-4 ordering where - // the skip ran before the dashboard Publish. + // this loop drops the ones at or below the caller's watermark. if (mxEvent.WorkerSequence <= afterWorkerSequence) { continue; } - // Mirror the event to the dashboard EventsHub group for this session. - // Fire-and-forget — broadcast errors must not affect the source gRPC stream. - // Server-041: IDashboardEventBroadcaster documents Publish as never-throw, - // but we enforce that at the seam too so a future implementation that adds - // synchronous validation or a serializer hop cannot fault this loop and end - // the client's gRPC stream. (Task 6 will move this tap onto its own - // distributor subscriber; for Task 4 it coexists here, firing once per - // delivered event for the single subscriber exactly as before.) - try - { - dashboardEventBroadcaster.Publish(session.SessionId, mxEvent); - } - catch (Exception ex) - { - logger.LogDebug( - ex, - "Dashboard event mirror threw for session {SessionId}; continuing.", - session.SessionId); - } - // Queue-depth gauge tracks events the pump has fanned into this subscriber's // channel but the client has not yet consumed — the same "buffered, not yet // delivered" quantity the pre-Task-4 per-RPC channel reported. The bounded diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index 414d806..c20b624 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -1,6 +1,8 @@ using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Workers; @@ -21,6 +23,10 @@ public sealed class GatewaySession private int _activeEventSubscriberCount; private SessionEventDistributor? _eventDistributor; private bool _eventDistributorStarted; + private bool _dashboardMirrorStarted; + private IEventSubscriberLease? _dashboardMirrorLease; + private Task? _dashboardMirrorTask; + private CancellationTokenSource? _dashboardMirrorCts; private readonly Dictionary<(int ServerHandle, int ItemHandle), SessionItemRegistration> _items = []; /// @@ -350,9 +356,22 @@ public sealed class GatewaySession /// /// Transitions the session to the Ready state. /// + /// + /// On becoming Ready the session starts its internal dashboard mirror (Task 6) when a + /// dashboard broadcaster was supplied. The mirror registers an internal subscriber on + /// the distributor and starts the pump before any gRPC client attaches, so the + /// dashboard EventsHub receives session events even with no gRPC subscriber streaming — + /// fixing the "dark feed" where the dashboard only saw events while a gRPC client was + /// actively streaming. Registering the internal subscriber BEFORE + /// also avoids the Task 4 hazard where + /// starting the pump at Ready with zero subscribers drained a fast-completing worker + /// stream into nothing and left a later subscriber hanging: there is now always a + /// subscriber (the dashboard one) registered before the pump starts. + /// public void MarkReady() { TransitionTo(SessionState.Ready); + StartDashboardMirror(); } // Constructs and starts the distributor exactly once, registering the subscriber under @@ -369,8 +388,24 @@ public sealed class GatewaySession // the start so the very first subscriber sees the stream from its beginning. private IEventSubscriberLease StartDistributorAndRegister() { - SessionEventDistributor distributor; - bool startNow = false; + SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow); + + // Register BEFORE starting the pump so a subscriber is present when the pump begins + // draining — no event is fanned to an empty subscriber set and then missed by this + // first subscriber. StartAsync only schedules the pump task; it never blocks. + IEventSubscriberLease lease = distributor.Register(); + StartPumpIfRequested(distributor, startNow); + + return lease; + } + + // Constructs the distributor exactly once and reports whether THIS caller is the one + // that should start the pump (i.e. it observed the unstarted state and claimed the + // start). Both the construction and the started-flag flip happen under _syncRoot so two + // concurrent callers (e.g. MarkReady's dashboard mirror and a racing first + // AttachEventSubscriber) agree on a single distributor and a single start. + private SessionEventDistributor EnsureDistributorCreated(out bool startNow) + { lock (_syncRoot) { if (_eventDistributor is null) @@ -387,28 +422,133 @@ public sealed class GatewaySession CreateOverflowHandler(eventOptions.BackpressurePolicy)); } - distributor = _eventDistributor; + startNow = false; if (!_eventDistributorStarted) { _eventDistributorStarted = true; startNow = true; } - } - // Register BEFORE starting the pump so a subscriber is present when the pump begins - // draining — no event is fanned to an empty subscriber set and then missed by this - // first subscriber. StartAsync only schedules the pump task; it never blocks. - IEventSubscriberLease lease = distributor.Register(); - if (startNow) + return _eventDistributor; + } + } + + private static void StartPumpIfRequested(SessionEventDistributor distributor, bool startNow) + { + if (!startNow) { - // StartAsync only schedules the pump via Task.Run and returns a completed task; - // it does not perform any async I/O itself. The sync-over-async call here is - // therefore safe and will not deadlock. Do not make StartAsync truly async - // (i.e., await real I/O before returning) without also changing this call site. - distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult(); + return; } - return lease; + // StartAsync only schedules the pump via Task.Run and returns a completed task; + // it does not perform any async I/O itself. The sync-over-async call here is + // therefore safe and will not deadlock. Do not make StartAsync truly async + // (i.e., await real I/O before returning) without also changing this call site. + distributor.StartAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + // Registers the gateway-owned internal dashboard subscriber on the distributor and starts + // a background loop that mirrors every fanned event to the dashboard broadcaster. Called + // once when the session becomes Ready (idempotent). The internal subscriber is registered + // BEFORE the pump starts (see StartDistributorAndRegister / EnsureDistributorCreated), so + // a subscriber is always present at pump start — the dashboard receives events with no + // gRPC subscriber attached, and the Task 4 "zero-subscriber drain into the void" hang + // cannot occur. No-op when no dashboard broadcaster was supplied (unit tests). + private void StartDashboardMirror() + { + IDashboardEventBroadcaster? broadcaster = _eventStreaming.DashboardBroadcaster; + if (broadcaster is null) + { + return; + } + + SessionEventDistributor distributor; + CancellationToken loopToken; + lock (_syncRoot) + { + if (_dashboardMirrorStarted || _state is SessionState.Closing or SessionState.Closed or SessionState.Faulted) + { + return; + } + + _dashboardMirrorStarted = true; + _dashboardMirrorCts = new CancellationTokenSource(); + loopToken = _dashboardMirrorCts.Token; + } + + // Create the distributor (claiming the start if we are first) and register the + // internal subscriber BEFORE starting the pump. isInternal: true keeps the dashboard + // subscriber out of the single-subscriber overflow accounting, so a slow/broken + // dashboard mirror only disconnects itself and never faults the session. + distributor = EnsureDistributorCreated(out bool startNow); + IEventSubscriberLease lease = distributor.Register(isInternal: true); + StartPumpIfRequested(distributor, startNow); + + lock (_syncRoot) + { + _dashboardMirrorLease = lease; + } + + _dashboardMirrorTask = Task.Run( + () => RunDashboardMirrorAsync(broadcaster, lease, loopToken), + CancellationToken.None); + } + + // Reads the internal dashboard subscriber's channel and publishes each RAW fanned event + // to the dashboard broadcaster. The dashboard is a first-class distributor subscriber + // (Task 6), so it sees the session's full raw event activity — NOT the per-gRPC-subscriber + // AfterWorkerSequence filtering that EventStreamService applies at its own boundary. This + // is intentional: the dashboard is a separate LDAP-authenticated monitoring view (per- + // session dashboard ACL is the separate Task 18). Publish is best-effort / never-throw, so + // a slow or broken dashboard cannot fault the session or stall the pump; the bounded + // internal subscriber channel (Task 5 per-subscriber isolation) only disconnects THIS + // mirror on overflow, leaving the session and other subscribers untouched. + private async Task RunDashboardMirrorAsync( + IDashboardEventBroadcaster broadcaster, + IEventSubscriberLease lease, + CancellationToken cancellationToken) + { + try + { + await foreach (MxEvent mxEvent in lease.Reader + .ReadAllAsync(cancellationToken) + .ConfigureAwait(false)) + { + try + { + broadcaster.Publish(SessionId, mxEvent); + } + catch (Exception exception) + { + // Publish is documented never-throw, but enforce it here too so a future + // implementation cannot fault the mirror loop. Logs identifiers only. + _eventStreaming.DistributorLogger.LogDebug( + exception, + "Dashboard event mirror threw for session {SessionId}; continuing.", + SessionId); + } + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Teardown path: the session is shutting down the mirror. + } + catch (SessionManagerException) + { + // The internal subscriber's channel overflowed and the distributor disconnected + // it with a terminal overflow fault. That disconnects only the dashboard mirror; + // the session, pump, and any gRPC subscriber are unaffected. Stop mirroring. + } + catch (Exception exception) + { + // Source-fault completion (worker event stream terminated abnormally) surfaces + // here. The session's own fault handling runs via the gRPC path / lifecycle; the + // mirror just stops. Logs identifiers only. + _eventStreaming.DistributorLogger.LogDebug( + exception, + "Dashboard event mirror loop ended for session {SessionId}.", + SessionId); + } } // Builds the per-subscriber backpressure handler the distributor invokes when a @@ -1108,6 +1248,46 @@ public sealed class GatewaySession { } + // Stop the internal dashboard mirror first: cancel its loop, dispose its lease (which + // unregisters its internal distributor subscriber and completes its channel), and + // await the loop task. Done BEFORE disposing the distributor and worker client — like + // the distributor itself — so the mirror is no longer reading the pump when the pump + // and its source (the worker client) tear down. + IEventSubscriberLease? dashboardLease; + Task? dashboardTask; + CancellationTokenSource? dashboardCts; + lock (_syncRoot) + { + dashboardLease = _dashboardMirrorLease; + dashboardTask = _dashboardMirrorTask; + dashboardCts = _dashboardMirrorCts; + _dashboardMirrorLease = null; + _dashboardMirrorTask = null; + _dashboardMirrorCts = null; + } + + if (dashboardCts is not null) + { + await dashboardCts.CancelAsync().ConfigureAwait(false); + } + + dashboardLease?.Dispose(); + + if (dashboardTask is not null) + { + try + { + await dashboardTask.ConfigureAwait(false); + } + catch (Exception) + { + // The mirror loop swallows its own faults; any escape here must not block + // disposal. The loop has stopped, which is all teardown requires. + } + } + + dashboardCts?.Dispose(); + // Stop the event pump and complete every subscriber channel before tearing down the // worker client (the pump's source). DisposeAsync is the single session teardown // point (SessionManager.RemoveSessionAsync awaits it after close), so awaiting it diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 6758a4d..6d90d32 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -240,7 +240,19 @@ public sealed class SessionEventDistributor : IAsyncDisposable /// subscriber and completes its channel without disturbing the pump or other /// subscribers. /// - public IEventSubscriberLease Register() + /// + /// for a gateway-owned internal subscriber (Task 6: the + /// session's dashboard mirror) that must NOT participate in the single-subscriber + /// overflow accounting. An internal subscriber is excluded from the + /// isOnlySubscriber count, so a lone external gRPC subscriber still reports + /// isOnlySubscriber == true (preserving legacy FailFast session-fault + /// behavior) even while the dashboard subscriber is attached; and an internal + /// subscriber that itself overflows always reports isOnlySubscriber == false, + /// so a slow/broken dashboard can never fault the session — it is merely + /// disconnected from the mirror. Defaults to (external + /// subscriber) so every existing call site is unchanged. + /// + public IEventSubscriberLease Register(bool isInternal = false) { // The pump is the single writer for this channel; readers are single-consumer // (one gRPC stream / dashboard subscriber). Synchronous continuations are @@ -265,7 +277,7 @@ public sealed class SessionEventDistributor : IAsyncDisposable }); long id = Interlocked.Increment(ref _nextSubscriberId); - Subscriber subscriber = new(id, channel); + Subscriber subscriber = new(id, channel, isInternal); // The disposed check AND the map add happen under the same lock with no await // in between. DisposeAsync sets _disposed=true under this same lock before it @@ -411,7 +423,14 @@ public sealed class SessionEventDistributor : IAsyncDisposable // race window opens — a concurrent second registration could cause Count to read as 1 // here even with two subscribers, producing a false FailFast that faults a shared // session. Resolve before enabling multi-subscriber. - bool isOnlySubscriber = _subscribers.Count == 1; + // + // Task 6: the gateway-owned internal dashboard subscriber is excluded from this + // accounting. (a) An internal subscriber that overflows is NEVER the "only subscriber" + // — a slow/broken dashboard must never fault the session, only disconnect its own + // mirror. (b) Internal subscribers are excluded from the count, so a lone external + // gRPC subscriber still reports isOnlySubscriber==true and preserves the legacy + // FailFast session-fault behavior even while the dashboard mirror is attached. + bool isOnlySubscriber = !subscriber.IsInternal && CountExternalSubscribers() == 1; _logger.LogDebug( "Event distributor disconnecting subscriber {SubscriberId} in session {SessionId} after queue overflow (worker sequence {WorkerSequence}).", @@ -446,6 +465,22 @@ public sealed class SessionEventDistributor : IAsyncDisposable } } + // Counts external (non-internal) subscribers. Drives the isOnlySubscriber FailFast + // decision so the gateway-owned internal dashboard subscriber never inflates the count. + private int CountExternalSubscribers() + { + int count = 0; + foreach (Subscriber subscriber in _subscribers.Values) + { + if (!subscriber.IsInternal) + { + count++; + } + } + + return count; + } + private void CompleteAllSubscribers(Exception? error) { foreach (Subscriber subscriber in _subscribers.Values) @@ -593,11 +628,15 @@ public sealed class SessionEventDistributor : IAsyncDisposable private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt); - private sealed class Subscriber(long id, Channel channel) + private sealed class Subscriber(long id, Channel channel, bool isInternal) { public long Id { get; } = id; public Channel Channel { get; } = channel; + + // True for the gateway-owned internal dashboard subscriber. Excluded from the + // single-subscriber overflow accounting so it cannot fault the session. + public bool IsInternal { get; } = isInternal; } private sealed class SubscriberLease(SessionEventDistributor distributor, Subscriber subscriber) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs index 9b24a68..f7e1120 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventStreaming.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; @@ -30,17 +31,25 @@ namespace ZB.MOM.WW.MxGateway.Server.Sessions; /// fault. Carrying it here keeps the distributor decoupled from the metrics type while /// preserving the observability the pre-epic per-RPC overflow path emitted. /// +/// +/// Sink the session's internal dashboard mirror loop (Task 6) publishes raw session +/// MxEvents to. When non-null the session registers an internal distributor +/// subscriber on becoming Ready and mirrors every fanned event to the dashboard +/// EventsHub group regardless of whether a gRPC client is streaming. When null +/// (unit tests that don't exercise the dashboard mirror) no mirror is started. +/// public sealed record SessionEventStreaming( MxAccessGrpcMapper Mapper, EventOptions EventOptions, ILogger DistributorLogger, TimeProvider TimeProvider, - GatewayMetrics Metrics) + GatewayMetrics Metrics, + IDashboardEventBroadcaster? DashboardBroadcaster = null) { /// /// Defaults used when a session is constructed without explicit streaming /// dependencies (unit tests). Uses a fresh mapper, default event options, a no-op - /// logger, the system clock, and a fresh metrics sink. + /// logger, the system clock, a fresh metrics sink, and no dashboard mirror. /// public static SessionEventStreaming Default { get; } = new( new MxAccessGrpcMapper(), diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs index 507b42a..06cd097 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionManager.cs @@ -27,6 +27,7 @@ public sealed class SessionManager : ISessionManager private readonly SemaphoreSlim _sessionSlots; private readonly Grpc.MxAccessGrpcMapper _eventMapper; private readonly ILogger _distributorLogger; + private readonly Dashboard.Hubs.IDashboardEventBroadcaster? _dashboardEventBroadcaster; /// /// Initializes a new instance of . @@ -39,6 +40,12 @@ public sealed class SessionManager : ISessionManager /// Logger. /// Mapper used by each session's event distributor to map worker events to public events. /// Logger passed to each session's event distributor pump. + /// + /// Dashboard SignalR fan-out sink. Each session registers an internal distributor + /// subscriber (Task 6) that mirrors raw session events to this broadcaster, so the + /// dashboard receives events regardless of whether a gRPC client is streaming. Null in + /// unit tests that do not exercise the dashboard mirror. + /// public SessionManager( ISessionRegistry registry, ISessionWorkerClientFactory workerClientFactory, @@ -47,7 +54,8 @@ public sealed class SessionManager : ISessionManager TimeProvider? timeProvider = null, ILogger? logger = null, Grpc.MxAccessGrpcMapper? eventMapper = null, - ILogger? distributorLogger = null) + ILogger? distributorLogger = null, + Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory)); @@ -57,6 +65,7 @@ public sealed class SessionManager : ISessionManager _logger = logger ?? NullLogger.Instance; _eventMapper = eventMapper ?? new Grpc.MxAccessGrpcMapper(); _distributorLogger = distributorLogger ?? NullLogger.Instance; + _dashboardEventBroadcaster = dashboardEventBroadcaster; _options = options.Value; _sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions); } @@ -451,7 +460,8 @@ public sealed class SessionManager : ISessionManager _options.Events, _distributorLogger, _timeProvider, - _metrics); + _metrics, + _dashboardEventBroadcaster); return new GatewaySession( sessionId, diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs index 16ae96d..d4da987 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs @@ -268,14 +268,13 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests workerClientFactory, options, _metrics, - logger: NullLogger.Instance); + logger: NullLogger.Instance, + dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance); MxAccessGrpcMapper mapper = new(); EventStreamService eventStreamService = new( sessionManager, options, - _metrics, - NullDashboardEventBroadcaster.Instance, - NullLogger.Instance); + _metrics); Service = new MxAccessGatewayService( sessionManager, diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs index 8dd8c7e..b09d3f9 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -9,7 +9,6 @@ using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; -using ZB.MOM.WW.MxGateway.Tests.TestSupport; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc; @@ -301,81 +300,11 @@ public sealed class EventStreamServiceTests Assert.Equal(1, metrics.GetSnapshot().Faults); } - /// - /// Tests-026 regression: - /// must mirror every yielded event to the - /// - /// seam (the only path that fans events out to dashboard SignalR clients). - /// A regression that silently dropped the Publish call — e.g. an - /// if accidentally added around it, or the broadcaster ctor - /// parameter being removed — would have produced no failing test before - /// this fixture existed. The recording fake captures every call and we - /// assert one publish per yielded event, with the correct session id and - /// preserved WorkerSequence. - /// - [Fact] - public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster() - { - FakeWorkerClient workerClient = new(); - GatewaySession session = CreateReadySession(workerClient); - RecordingDashboardEventBroadcaster recordingBroadcaster = new(); - EventStreamService service = CreateService( - new FakeSessionManager(session), - dashboardEventBroadcaster: recordingBroadcaster); - workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange)); - workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete)); - workerClient.CompleteAfterConfiguredEvents = true; - - List events = await CollectEventsAsync(service, session.SessionId); - - Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); - IReadOnlyList captures = recordingBroadcaster.Captures; - Assert.Equal(2, captures.Count); - Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId)); - Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); - Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family); - Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family); - } - - /// - /// Server-041 regression: must not - /// abort the gRPC stream when the dashboard broadcaster throws. - /// IDashboardEventBroadcaster.Publish is documented as - /// best-effort and never-throw, but the gRPC consumer cannot rely on - /// implementation discipline alone — the seam itself swallows the - /// fault and logs at debug, mirroring the broadcaster's own - /// continuation handler. Without the wrap, the producer loop would - /// surface the exception and the client would see a faulted stream - /// for a dashboard-mirror failure. - /// - [Fact] - public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession() - { - FakeWorkerClient workerClient = new(); - GatewaySession session = CreateReadySession(workerClient); - using GatewayMetrics metrics = new(); - ThrowingDashboardEventBroadcaster throwingBroadcaster = new(); - EventStreamService service = CreateService( - new FakeSessionManager(session), - metrics, - dashboardEventBroadcaster: throwingBroadcaster); - workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange)); - workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange)); - workerClient.CompleteAfterConfiguredEvents = true; - - List events = await CollectEventsAsync(service, session.SessionId); - - Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); - Assert.Equal(2, throwingBroadcaster.PublishAttempts); - Assert.NotEqual(SessionState.Faulted, session.State); - } - private static EventStreamService CreateService( FakeSessionManager sessionManager, GatewayMetrics? metrics = null, int queueCapacity = 8, - EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast, - ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null) + EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast) { return new EventStreamService( sessionManager, @@ -387,24 +316,7 @@ public sealed class EventStreamServiceTests BackpressurePolicy = backpressurePolicy, }, }), - metrics ?? new GatewayMetrics(), - dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance, - NullLogger.Instance); - } - - private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster - { - /// Gets the count of publish attempts. - public int PublishAttempts { get; private set; } - - /// Increments the attempt count and throws a simulated failure. - /// The session identifier. - /// The event to publish. - public void Publish(string sessionId, MxEvent mxEvent) - { - PublishAttempts++; - throw new InvalidOperationException("simulated dashboard broadcaster failure"); - } + metrics ?? new GatewayMetrics()); } private static async Task> CollectEventsAsync( diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs new file mode 100644 index 0000000..327ff3b --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/GatewaySessionDashboardMirrorTests.cs @@ -0,0 +1,316 @@ +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.MxGateway.Contracts; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; +using ZB.MOM.WW.MxGateway.Server.Grpc; +using ZB.MOM.WW.MxGateway.Server.Metrics; +using ZB.MOM.WW.MxGateway.Server.Sessions; +using ZB.MOM.WW.MxGateway.Server.Workers; +using ZB.MOM.WW.MxGateway.Tests.TestSupport; + +namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; + +/// +/// Task 6 regression tests for the internal dashboard mirror. The dashboard is a +/// first-class subscriber on the session's , so it +/// receives session events whether or not a gRPC client is streaming — fixing the +/// "dark feed" where the dashboard only saw events while a gRPC client was actively +/// streaming (the inline per-RPC tap removed by this task). +/// +public sealed class GatewaySessionDashboardMirrorTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + + /// + /// The KEY bug-fix test: the dashboard broadcaster receives session events even when + /// NO gRPC StreamEvents subscriber is attached. The session is driven to Ready + /// with a fake worker emitting events; only the internal dashboard subscriber exists. + /// Before Task 6 the mirror lived inside the per-RPC gRPC loop, so with no gRPC + /// subscriber the dashboard saw nothing. + /// + [Fact] + public async Task DashboardMirror_ReceivesEvents_WithNoGrpcSubscriber() + { + FakeWorkerClient workerClient = new(); + workerClient.Events.Add(CreateWorkerEvent(10, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(11, MxEventFamily.OnWriteComplete)); + workerClient.CompleteAfterConfiguredEvents = true; + RecordingDashboardEventBroadcaster broadcaster = new(); + + await using GatewaySession session = CreateSession(workerClient, broadcaster); + session.AttachWorkerClient(workerClient); + + // MarkReady starts the internal dashboard mirror; no gRPC subscriber is ever attached. + session.MarkReady(); + + await WaitUntilAsync(() => broadcaster.Captures.Count == 2); + + IReadOnlyList captures = broadcaster.Captures; + Assert.Equal(0, session.ActiveEventSubscriberCount); + Assert.Equal([10UL, 11UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); + Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId)); + } + + /// + /// A gRPC subscriber and the dashboard both receive every event concurrently. The + /// gRPC path is no longer the dashboard's source — both read independent leases fed by + /// the single distributor pump. + /// + [Fact] + public async Task DashboardMirror_AndGrpcSubscriber_BothReceiveEvents() + { + FakeWorkerClient workerClient = new(); + workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(3, MxEventFamily.OnWriteComplete)); + workerClient.CompleteAfterConfiguredEvents = true; + RecordingDashboardEventBroadcaster broadcaster = new(); + + await using GatewaySession session = CreateSession(workerClient, broadcaster); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + + EventStreamService service = new( + new SingleSessionManager(session), + Options.Create(new GatewayOptions { Events = new EventOptions { QueueCapacity = 8 } }), + new GatewayMetrics()); + + List grpcEvents = []; + await foreach (MxEvent mxEvent in service + .StreamEventsAsync(new StreamEventsRequest { SessionId = session.SessionId }, CancellationToken.None) + .WithCancellation(CancellationToken.None)) + { + grpcEvents.Add(mxEvent); + } + + await WaitUntilAsync(() => broadcaster.Captures.Count == 3); + + Assert.Equal([1UL, 2UL, 3UL], grpcEvents.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); + Assert.Equal([1UL, 2UL, 3UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); + } + + /// + /// Task 4 hazard guard: starting the pump at Ready with a fast-completing worker stream + /// and zero subscribers used to drain into nothing and leave a later subscriber hanging. + /// Now the dashboard subscriber is registered BEFORE the pump starts, so even a worker + /// stream that completes immediately delivers every event to the dashboard with no hang. + /// + [Fact] + public async Task DashboardMirror_FastCompletingWorkerStream_DeliversAllEventsWithoutHang() + { + FakeWorkerClient workerClient = new(); + workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); + workerClient.CompleteAfterConfiguredEvents = true; + RecordingDashboardEventBroadcaster broadcaster = new(); + + await using GatewaySession session = CreateSession(workerClient, broadcaster); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + + await WaitUntilAsync(() => broadcaster.Captures.Count == 2); + Assert.Equal([1UL, 2UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); + } + + /// + /// The dashboard Publish must be never-throw at the seam too: a throwing broadcaster + /// must not fault the session or stop the mirror from continuing past the failure. + /// + [Fact] + public async Task DashboardMirror_WhenBroadcasterThrows_DoesNotFaultSessionAndKeepsMirroring() + { + FakeWorkerClient workerClient = new(); + workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); + workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); + workerClient.CompleteAfterConfiguredEvents = true; + ThrowingDashboardEventBroadcaster broadcaster = new(); + + await using GatewaySession session = CreateSession(workerClient, broadcaster); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + + await WaitUntilAsync(() => broadcaster.PublishAttempts == 2); + Assert.NotEqual(SessionState.Faulted, session.State); + } + + /// + /// The internal dashboard subscriber must NOT count against the single-subscriber + /// guard: a gRPC subscriber can still attach while the dashboard mirror is running. + /// + [Fact] + public async Task DashboardMirror_DoesNotCountAgainstSingleSubscriberGuard() + { + FakeWorkerClient workerClient = new(); + RecordingDashboardEventBroadcaster broadcaster = new(); + + await using GatewaySession session = CreateSession(workerClient, broadcaster); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + + Assert.Equal(0, session.ActiveEventSubscriberCount); + using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false); + Assert.Equal(1, session.ActiveEventSubscriberCount); + } + + private static GatewaySession CreateSession( + IWorkerClient workerClient, + IDashboardEventBroadcaster broadcaster) + { + return new GatewaySession( + sessionId: "session-dashboard-mirror", + backendName: GatewayContractInfo.DefaultBackendName, + pipeName: "mxaccess-gateway-1-session-dashboard-mirror", + nonce: "nonce", + clientIdentity: "client-1", + ownerKeyId: null, + clientSessionName: "test-session", + clientCorrelationId: "client-correlation-1", + commandTimeout: TimeSpan.FromSeconds(5), + startupTimeout: TimeSpan.FromSeconds(5), + shutdownTimeout: TimeSpan.FromSeconds(5), + leaseDuration: TimeSpan.FromMinutes(30), + openedAt: DateTimeOffset.UtcNow, + eventStreaming: new SessionEventStreaming( + new MxAccessGrpcMapper(), + new EventOptions { QueueCapacity = 8 }, + NullLogger.Instance, + TimeProvider.System, + new GatewayMetrics(), + broadcaster)); + } + + private static WorkerEvent CreateWorkerEvent(ulong sequence, MxEventFamily family) + { + MxEvent mxEvent = new() + { + SessionId = "session-dashboard-mirror", + Family = family, + WorkerSequence = sequence, + }; + + switch (family) + { + case MxEventFamily.OnDataChange: + mxEvent.OnDataChange = new OnDataChangeEvent(); + break; + case MxEventFamily.OnWriteComplete: + mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); + break; + } + + return new WorkerEvent { Event = mxEvent }; + } + + private static async Task WaitUntilAsync(Func predicate) + { + using CancellationTokenSource cancellationTokenSource = new(TestTimeout); + while (!predicate()) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); + } + } + + private sealed class ThrowingDashboardEventBroadcaster : IDashboardEventBroadcaster + { + private int _publishAttempts; + + public int PublishAttempts => Volatile.Read(ref _publishAttempts); + + public void Publish(string sessionId, MxEvent mxEvent) + { + Interlocked.Increment(ref _publishAttempts); + throw new InvalidOperationException("simulated dashboard broadcaster failure"); + } + } + + private sealed class SingleSessionManager(GatewaySession session) : ISessionManager + { + public Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + string? ownerKeyId, + CancellationToken cancellationToken) => Task.FromResult(session); + + public bool TryGetSession(string sessionId, out GatewaySession gatewaySession) + { + gatewaySession = session; + return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal); + } + + public Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply()); + + public IAsyncEnumerable ReadEventsAsync( + string sessionId, + CancellationToken cancellationToken) => session.ReadEventsAsync(cancellationToken); + + public Task CloseSessionAsync( + string sessionId, + CancellationToken cancellationToken) => + Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + + public Task KillWorkerAsync( + string sessionId, + string reason, + CancellationToken cancellationToken) => + Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + + public Task CloseExpiredLeasesAsync( + DateTimeOffset now, + CancellationToken cancellationToken) => Task.FromResult(0); + + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } + + private sealed class FakeWorkerClient : IWorkerClient + { + public List Events { get; } = []; + + public bool CompleteAfterConfiguredEvents { get; set; } + + public string SessionId { get; } = "session-dashboard-mirror"; + + public int? ProcessId { get; } = 1234; + + public WorkerClientState State { get; } = WorkerClientState.Ready; + + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply()); + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + foreach (WorkerEvent workerEvent in Events) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return workerEvent; + } + + if (CompleteAfterConfiguredEvents) + { + yield break; + } + + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken); + } + + public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; + + public void Kill(string reason) + { + } + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +}