dashboard: clear deferred items — EventsHub publisher + doc refresh

EventsHub publisher (closes the v2.1 follow-up flagged in the previous commit)

EventStreamService now mirrors every MxEvent it forwards to a gRPC client
into the `EventsHub` group for the session. The fan-out goes through a new
singleton `IDashboardEventBroadcaster`:

  * IDashboardEventBroadcaster — abstraction so EventStreamService doesn't
    take a direct dependency on SignalR.
  * DashboardEventBroadcaster — singleton implementation that hands the
    SendAsync to IHubContext<EventsHub> as fire-and-forget. Errors are
    logged at debug and dropped so the source gRPC stream is never
    blocked.

EventStreamService now takes IDashboardEventBroadcaster as a ctor parameter
and calls Publish(sessionId, publicEvent) once per event after sequence
filtering, before the bounded queue write. Test fixtures and the live
integration harness pass NullDashboardEventBroadcaster.Instance so the
broadcaster is a no-op in unit tests.

SessionDetailsPage adds a "Recent events" panel:
  * implements IAsyncDisposable
  * opens a second HubConnection via DashboardHubConnectionFactory targeting
    /hubs/events
  * calls SubscribeSession(SessionId) on Start
  * renders the most recent 50 events in a small table (worker seq, family,
    server/item handle, alarm reference when the event is OnAlarmTransition)
  * shows a live/offline conn-pill driven by HubConnection.Closed /
    Reconnected events

The dashboard mirror is intentionally passive — events appear only while a
gRPC client is also consuming that session's events. Documented as such in
the empty-state copy and in GatewayDashboardDesign.md.

Documentation refresh

Every doc that referenced the retired options (PathBase, RequireAdminScope,
RequiredGroup) and the old API-key-cookie auth flow is updated to describe
the new model:

  * CLAUDE.md — Authentication section now explains LDAP bind +
    GroupToRole + HubToken bearer flow.
  * gateway.md — Dashboard section: root-mounted routes, snapshot/alarms/
    events SignalR hubs, LDAP cookie + bearer scheme.
  * docs/GatewayConfiguration.md — drop PathBase / RequireAdminScope rows,
    add GroupToRole row, append "Authorization policies" and "SignalR hubs"
    subsections describing the three policies and the /hubs/* endpoints.
  * docs/GatewayDashboardDesign.md — hosting model (root mount, new
    endpoint layout), Realtime Updates rewritten as a hub table
    (DashboardSnapshotHub / AlarmsHub / EventsHub with producers, payloads,
    and routing), Authentication And Authorization rewritten around LDAP +
    role mapping + the hub bearer flow, Configuration block updated.
  * docs/GatewayProcessDesign.md — security-section dashboard paragraph
    and the example config block both refreshed to LDAP/role auth.
  * docs/ImplementationPlanGateway.md — dashboard-auth deliverable list
    updated (LDAP bind + GroupToRole + /hubs/token bearer mint replace the
    API-key login flow).
  * docs/GatewayTesting.md — DashboardLdapLiveTests blurb describes the
    GroupToRole fixture (`{ GwAdmin: Admin }`) instead of the retired
    RequiredGroup default; success-path assertion explains the role-claim
    check.

Verification: 475 server tests, 275 worker tests (+ 9 dev-rig skips), 18
integration tests (live MxAccess + LDAP + Galaxy) all pass — including the
live worker smoke test fixture that now constructs EventStreamService with
the new broadcaster parameter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 02:07:30 -04:00
parent 65943597d4
commit d692232191
15 changed files with 471 additions and 129 deletions
@@ -1082,6 +1082,7 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
options,
mapper,
_metrics,
NullDashboardEventBroadcaster.Instance,
_loggerFactory.CreateLogger<EventStreamService>());
Service = new MxAccessGatewayService(
@@ -1593,4 +1594,10 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
ConstraintFailure failure,
CancellationToken cancellationToken) => Task.CompletedTask;
}
private sealed class NullDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
public static readonly NullDashboardEventBroadcaster Instance = new();
public void Publish(string sessionId, ZB.MOM.WW.MxGateway.Contracts.Proto.MxEvent mxEvent) { }
}
}
@@ -1,5 +1,9 @@
@page "/sessions/{SessionId}"
@inherits DashboardPageBase
@implements IAsyncDisposable
@using Microsoft.AspNetCore.SignalR.Client
@using ZB.MOM.WW.MxGateway.Contracts.Proto
@using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs
<PageTitle>Dashboard Session</PageTitle>
@@ -59,12 +63,150 @@ else
</table>
</div>
</section>
<section class="dashboard-section">
<div class="section-heading">
<h2>Recent events</h2>
<span class="conn-pill" data-state="@(_eventsConnected ? "connected" : "disconnected")">
<span class="dot"></span>
<span>@(_eventsConnected ? "live" : "offline")</span>
</span>
</div>
@if (_recentEvents.Count == 0)
{
<div class="empty-state">
Waiting for events. The dashboard mirrors the session's gRPC event stream — events
appear here only while a gRPC client is also consuming this session's events.
</div>
}
else
{
<div class="table-responsive">
<table class="table table-sm dashboard-table">
<thead>
<tr>
<th scope="col">Worker seq</th>
<th scope="col">Family</th>
<th scope="col">Server</th>
<th scope="col">Item</th>
<th scope="col">Status</th>
</tr>
</thead>
<tbody>
@foreach (MxEvent evt in _recentEvents)
{
<tr>
<td class="num mono">@evt.WorkerSequence</td>
<td>@evt.Family</td>
<td class="num mono">@evt.ServerHandle</td>
<td class="num mono">@evt.ItemHandle</td>
<td>@DashboardDisplay.Text(EventStatusLabel(evt))</td>
</tr>
}
</tbody>
</table>
</div>
}
</section>
}
@code {
private const int MaxRecentEvents = 50;
[Parameter]
public string SessionId { get; set; } = string.Empty;
private DashboardSessionSummary? CurrentSession => Snapshot?.Sessions.FirstOrDefault(session =>
string.Equals(session.SessionId, SessionId, StringComparison.Ordinal));
private HubConnection? _eventsHub;
private bool _eventsConnected;
private string? _subscribedSessionId;
private readonly LinkedList<MxEvent> _recentEvents = new();
protected override async Task OnParametersSetAsync()
{
if (!string.Equals(_subscribedSessionId, SessionId, StringComparison.Ordinal))
{
await DetachEventsHubAsync().ConfigureAwait(false);
await AttachEventsHubAsync().ConfigureAwait(false);
}
}
private async Task AttachEventsHubAsync()
{
if (string.IsNullOrWhiteSpace(SessionId))
{
return;
}
_eventsHub = HubFactory.Create("/hubs/events");
_eventsHub.On<MxEvent>(EventsHub.EventMessage, async mxEvent =>
{
_recentEvents.AddFirst(mxEvent);
while (_recentEvents.Count > MaxRecentEvents)
{
_recentEvents.RemoveLast();
}
await InvokeAsync(StateHasChanged).ConfigureAwait(false);
});
_eventsHub.Closed += _ =>
{
_eventsConnected = false;
return InvokeAsync(StateHasChanged);
};
_eventsHub.Reconnected += _ =>
{
_eventsConnected = true;
return InvokeAsync(StateHasChanged);
};
try
{
await _eventsHub.StartAsync().ConfigureAwait(false);
await _eventsHub.SendAsync("SubscribeSession", SessionId).ConfigureAwait(false);
_eventsConnected = true;
_subscribedSessionId = SessionId;
}
catch
{
_eventsConnected = false;
}
}
private async Task DetachEventsHubAsync()
{
HubConnection? hub = _eventsHub;
_eventsHub = null;
_eventsConnected = false;
_subscribedSessionId = null;
_recentEvents.Clear();
if (hub is not null)
{
try
{
await hub.DisposeAsync().ConfigureAwait(false);
}
catch
{
// Disposal-time errors are best-effort.
}
}
}
private static string EventStatusLabel(MxEvent evt)
{
return evt.Family == MxEventFamily.OnAlarmTransition
? evt.OnAlarmTransition?.AlarmFullReference ?? string.Empty
: string.Empty;
}
public new async ValueTask DisposeAsync()
{
await DetachEventsHubAsync().ConfigureAwait(false);
await base.DisposeAsync().ConfigureAwait(false);
}
}
@@ -22,6 +22,7 @@ public static class DashboardServiceCollectionExtensions
services.AddSingleton<IDashboardApiKeyManagementService, DashboardApiKeyManagementService>();
services.AddSingleton<HubTokenService>();
services.AddScoped<Hubs.DashboardHubConnectionFactory>();
services.AddSingleton<Hubs.IDashboardEventBroadcaster, Hubs.DashboardEventBroadcaster>();
services.AddHostedService<Hubs.DashboardSnapshotPublisher>();
services.AddHostedService<Hubs.AlarmsHubPublisher>();
services.AddHttpContextAccessor();
@@ -0,0 +1,41 @@
using Microsoft.AspNetCore.SignalR;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
/// <summary>
/// Broadcasts MxEvents to <see cref="EventsHub"/> clients subscribed to the
/// session's group. Fire-and-forget: we hand the send to the hub context
/// and return immediately so the source gRPC stream is never blocked.
/// Errors are logged once and dropped — keeping the SignalR mirror best-effort
/// preserves the gRPC contract that exists today.
/// </summary>
public sealed class DashboardEventBroadcaster(
IHubContext<EventsHub> hubContext,
ILogger<DashboardEventBroadcaster> logger) : IDashboardEventBroadcaster
{
public void Publish(string sessionId, MxEvent mxEvent)
{
if (string.IsNullOrEmpty(sessionId) || mxEvent is null)
{
return;
}
Task send = hubContext.Clients
.Group(EventsHub.GroupName(sessionId))
.SendAsync(EventsHub.EventMessage, mxEvent);
if (!send.IsCompletedSuccessfully)
{
_ = send.ContinueWith(
t =>
{
if (t.Exception is { } ex)
{
logger.LogDebug(ex, "Dashboard event mirror to session {SessionId} failed.", sessionId);
}
},
TaskScheduler.Default);
}
}
}
@@ -0,0 +1,14 @@
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
/// <summary>
/// Fan-out point for MxEvents that should be mirrored to dashboard
/// SignalR clients subscribed to a session's events group. Implementations
/// must never throw — broadcast failures are best-effort and must not
/// disrupt the source gRPC stream.
/// </summary>
public interface IDashboardEventBroadcaster
{
void Publish(string sessionId, MxEvent mxEvent);
}
@@ -3,6 +3,7 @@ using System.Threading.Channels;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
@@ -14,6 +15,7 @@ public sealed class EventStreamService(
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
IDashboardEventBroadcaster dashboardEventBroadcaster,
ILogger<EventStreamService> logger) : IEventStreamService
{
/// <summary>
@@ -118,6 +120,11 @@ public sealed class EventStreamService(
continue;
}
// Mirror the event to the dashboard EventsHub group for this
// session. Fire-and-forget — broadcast errors must not affect
// the source gRPC stream.
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
if (!writer.TryWrite(publicEvent))
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
@@ -178,6 +178,7 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
options,
mapper,
_metrics,
NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
Service = new MxAccessGatewayService(
@@ -413,4 +414,9 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
private sealed class NullDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
public static readonly NullDashboardEventBroadcaster Instance = new();
public void Publish(string sessionId, ZB.MOM.WW.MxGateway.Contracts.Proto.MxEvent mxEvent) { }
}
}
@@ -278,9 +278,16 @@ public sealed class EventStreamServiceTests
}),
new MxAccessGrpcMapper(),
metrics ?? new GatewayMetrics(),
NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
}
private sealed class NullDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
public static readonly NullDashboardEventBroadcaster Instance = new();
public void Publish(string sessionId, MxEvent mxEvent) { }
}
private static async Task<List<MxEvent>> CollectEventsAsync(
EventStreamService service,
string sessionId)