feat(dashboard): mirror events via SessionEventDistributor subscriber (fixes dark feed without gRPC client)

This commit is contained in:
Joseph Doherty
2026-06-15 14:42:32 -04:00
parent 4f43733b96
commit 1ea08c3b10
9 changed files with 600 additions and 148 deletions
@@ -268,14 +268,13 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
workerClientFactory,
options,
_metrics,
logger: NullLogger<SessionManager>.Instance);
logger: NullLogger<SessionManager>.Instance,
dashboardEventBroadcaster: NullDashboardEventBroadcaster.Instance);
MxAccessGrpcMapper mapper = new();
EventStreamService eventStreamService = new(
sessionManager,
options,
_metrics,
NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
_metrics);
Service = new MxAccessGatewayService(
sessionManager,
@@ -9,7 +9,6 @@ using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc;
@@ -301,81 +300,11 @@ public sealed class EventStreamServiceTests
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
/// <summary>
/// Tests-026 regression: <see cref="EventStreamService.StreamEventsAsync"/>
/// must mirror every yielded event to the
/// <see cref="ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster"/>
/// seam (the only path that fans events out to dashboard SignalR clients).
/// A regression that silently dropped the <c>Publish</c> call — e.g. an
/// <c>if</c> accidentally added around it, or the broadcaster ctor
/// parameter being removed — would have produced no failing test before
/// this fixture existed. The recording fake captures every call and we
/// assert one publish per yielded event, with the correct session id and
/// preserved <c>WorkerSequence</c>.
/// </summary>
[Fact]
public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
RecordingDashboardEventBroadcaster recordingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
dashboardEventBroadcaster: recordingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
IReadOnlyList<DashboardEventCapture> captures = recordingBroadcaster.Captures;
Assert.Equal(2, captures.Count);
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family);
Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family);
}
/// <summary>
/// Server-041 regression: <see cref="EventStreamService"/> must not
/// abort the gRPC stream when the dashboard broadcaster throws.
/// <c>IDashboardEventBroadcaster.Publish</c> is documented as
/// best-effort and never-throw, but the gRPC consumer cannot rely on
/// implementation discipline alone — the seam itself swallows the
/// fault and logs at debug, mirroring the broadcaster's own
/// continuation handler. Without the wrap, the producer loop would
/// surface the exception and the client would see a faulted stream
/// for a dashboard-mirror failure.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
ThrowingDashboardEventBroadcaster throwingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
dashboardEventBroadcaster: throwingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal(2, throwingBroadcaster.PublishAttempts);
Assert.NotEqual(SessionState.Faulted, session.State);
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
int queueCapacity = 8,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast,
ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
{
return new EventStreamService(
sessionManager,
@@ -387,24 +316,7 @@ public sealed class EventStreamServiceTests
BackpressurePolicy = backpressurePolicy,
},
}),
metrics ?? new GatewayMetrics(),
dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
}
private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
/// <summary>Gets the count of publish attempts.</summary>
public int PublishAttempts { get; private set; }
/// <summary>Increments the attempt count and throws a simulated failure.</summary>
/// <param name="sessionId">The session identifier.</param>
/// <param name="mxEvent">The event to publish.</param>
public void Publish(string sessionId, MxEvent mxEvent)
{
PublishAttempts++;
throw new InvalidOperationException("simulated dashboard broadcaster failure");
}
metrics ?? new GatewayMetrics());
}
private static async Task<List<MxEvent>> CollectEventsAsync(
@@ -0,0 +1,316 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions;
/// <summary>
/// Task 6 regression tests for the internal dashboard mirror. The dashboard is a
/// first-class subscriber on the session's <see cref="SessionEventDistributor"/>, so it
/// receives session events whether or not a gRPC client is streaming — fixing the
/// "dark feed" where the dashboard only saw events while a gRPC client was actively
/// streaming (the inline per-RPC tap removed by this task).
/// </summary>
public sealed class GatewaySessionDashboardMirrorTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
/// <summary>
/// The KEY bug-fix test: the dashboard broadcaster receives session events even when
/// NO gRPC <c>StreamEvents</c> subscriber is attached. The session is driven to Ready
/// with a fake worker emitting events; only the internal dashboard subscriber exists.
/// Before Task 6 the mirror lived inside the per-RPC gRPC loop, so with no gRPC
/// subscriber the dashboard saw nothing.
/// </summary>
[Fact]
public async Task DashboardMirror_ReceivesEvents_WithNoGrpcSubscriber()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(10, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(11, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
// MarkReady starts the internal dashboard mirror; no gRPC subscriber is ever attached.
session.MarkReady();
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
IReadOnlyList<DashboardEventCapture> captures = broadcaster.Captures;
Assert.Equal(0, session.ActiveEventSubscriberCount);
Assert.Equal([10UL, 11UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
}
/// <summary>
/// A gRPC subscriber and the dashboard both receive every event concurrently. The
/// gRPC path is no longer the dashboard's source — both read independent leases fed by
/// the single distributor pump.
/// </summary>
[Fact]
public async Task DashboardMirror_AndGrpcSubscriber_BothReceiveEvents()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(3, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
EventStreamService service = new(
new SingleSessionManager(session),
Options.Create(new GatewayOptions { Events = new EventOptions { QueueCapacity = 8 } }),
new GatewayMetrics());
List<MxEvent> grpcEvents = [];
await foreach (MxEvent mxEvent in service
.StreamEventsAsync(new StreamEventsRequest { SessionId = session.SessionId }, CancellationToken.None)
.WithCancellation(CancellationToken.None))
{
grpcEvents.Add(mxEvent);
}
await WaitUntilAsync(() => broadcaster.Captures.Count == 3);
Assert.Equal([1UL, 2UL, 3UL], grpcEvents.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal([1UL, 2UL, 3UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
}
/// <summary>
/// Task 4 hazard guard: starting the pump at Ready with a fast-completing worker stream
/// and zero subscribers used to drain into nothing and leave a later subscriber hanging.
/// Now the dashboard subscriber is registered BEFORE the pump starts, so even a worker
/// stream that completes immediately delivers every event to the dashboard with no hang.
/// </summary>
[Fact]
public async Task DashboardMirror_FastCompletingWorkerStream_DeliversAllEventsWithoutHang()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
await WaitUntilAsync(() => broadcaster.Captures.Count == 2);
Assert.Equal([1UL, 2UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
}
/// <summary>
/// The dashboard Publish must be never-throw at the seam too: a throwing broadcaster
/// must not fault the session or stop the mirror from continuing past the failure.
/// </summary>
[Fact]
public async Task DashboardMirror_WhenBroadcasterThrows_DoesNotFaultSessionAndKeepsMirroring()
{
FakeWorkerClient workerClient = new();
workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
ThrowingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
await WaitUntilAsync(() => broadcaster.PublishAttempts == 2);
Assert.NotEqual(SessionState.Faulted, session.State);
}
/// <summary>
/// The internal dashboard subscriber must NOT count against the single-subscriber
/// guard: a gRPC subscriber can still attach while the dashboard mirror is running.
/// </summary>
[Fact]
public async Task DashboardMirror_DoesNotCountAgainstSingleSubscriberGuard()
{
FakeWorkerClient workerClient = new();
RecordingDashboardEventBroadcaster broadcaster = new();
await using GatewaySession session = CreateSession(workerClient, broadcaster);
session.AttachWorkerClient(workerClient);
session.MarkReady();
Assert.Equal(0, session.ActiveEventSubscriberCount);
using IEventSubscriberLease lease = session.AttachEventSubscriber(allowMultipleSubscribers: false);
Assert.Equal(1, session.ActiveEventSubscriberCount);
}
private static GatewaySession CreateSession(
IWorkerClient workerClient,
IDashboardEventBroadcaster broadcaster)
{
return new GatewaySession(
sessionId: "session-dashboard-mirror",
backendName: GatewayContractInfo.DefaultBackendName,
pipeName: "mxaccess-gateway-1-session-dashboard-mirror",
nonce: "nonce",
clientIdentity: "client-1",
ownerKeyId: null,
clientSessionName: "test-session",
clientCorrelationId: "client-correlation-1",
commandTimeout: TimeSpan.FromSeconds(5),
startupTimeout: TimeSpan.FromSeconds(5),
shutdownTimeout: TimeSpan.FromSeconds(5),
leaseDuration: TimeSpan.FromMinutes(30),
openedAt: DateTimeOffset.UtcNow,
eventStreaming: new SessionEventStreaming(
new MxAccessGrpcMapper(),
new EventOptions { QueueCapacity = 8 },
NullLogger<SessionEventDistributor>.Instance,
TimeProvider.System,
new GatewayMetrics(),
broadcaster));
}
private static WorkerEvent CreateWorkerEvent(ulong sequence, MxEventFamily family)
{
MxEvent mxEvent = new()
{
SessionId = "session-dashboard-mirror",
Family = family,
WorkerSequence = sequence,
};
switch (family)
{
case MxEventFamily.OnDataChange:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
}
return new WorkerEvent { Event = mxEvent };
}
private static async Task WaitUntilAsync(Func<bool> predicate)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
private sealed class ThrowingDashboardEventBroadcaster : IDashboardEventBroadcaster
{
private int _publishAttempts;
public int PublishAttempts => Volatile.Read(ref _publishAttempts);
public void Publish(string sessionId, MxEvent mxEvent)
{
Interlocked.Increment(ref _publishAttempts);
throw new InvalidOperationException("simulated dashboard broadcaster failure");
}
}
private sealed class SingleSessionManager(GatewaySession session) : ISessionManager
{
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
string? ownerKeyId,
CancellationToken cancellationToken) => Task.FromResult(session);
public bool TryGetSession(string sessionId, out GatewaySession gatewaySession)
{
gatewaySession = session;
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
}
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken) => session.ReadEventsAsync(cancellationToken);
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken) =>
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
public Task<SessionCloseResult> KillWorkerAsync(
string sessionId,
string reason,
CancellationToken cancellationToken) =>
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken) => Task.FromResult(0);
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class FakeWorkerClient : IWorkerClient
{
public List<WorkerEvent> Events { get; } = [];
public bool CompleteAfterConfiguredEvents { get; set; }
public string SessionId { get; } = "session-dashboard-mirror";
public int? ProcessId { get; } = 1234;
public WorkerClientState State { get; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply());
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (WorkerEvent workerEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
yield return workerEvent;
}
if (CompleteAfterConfiguredEvents)
{
yield break;
}
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
public void Kill(string reason)
{
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
}