Resolve Server-031..032 (re-triaged) + Server-038..043

Server-031: re-triaged. The recommended gateway-side
"skip-while-command-in-flight" guard is already in place at
WorkerClient.HeartbeatLoopAsync via WorkerClientOptions.HeartbeatStuckCeiling
(default 75s = 5× HeartbeatGrace). Two regression tests pin the
behaviour. Recommendation #1 (decouple worker-side _writeLock) is a
Worker-module concern (Worker-017 / Worker-023) and out of scope here.

Server-032: re-triaged. Recommendation #2 (rich diagnostic) is already
in EnqueueWorkerEventAsync, with #3 (overflow grace) absorbed by the
TryWrite → WriteAsync-with-timeout fall-through. Test
EnqueueWorkerEvent_WhenChannelFullPastTimeout_FaultsWithRichDiagnostic
pins the diagnostic string. Recommendation #1 (prose contract in
gateway.md / docs) is deferred — outside this pass's edit scope.

Server-038 (Security): EventsHub.SubscribeSession's missing per-session
ACL is documented with a TODO(per-session-acl) and a <remarks> block
explaining the v1 acceptance (any dashboard role can subscribe to any
session — non-secret metadata, redacted value logging). The per-session
ACL design lands in a follow-up once a session-scoped role exists.

Server-039 (Error handling): HubTokenService.Validate now rejects a
deserialized payload where both Name and NameIdentifier are null/empty.
New test file HubTokenServiceTests.cs covers the regression and five
sanity cases. TDD confirmed.

Server-040 (Conventions): MapGroupsToRoles gains a precedence comment
explaining "full literal match first, leading-RDN fallback;
OrdinalIgnoreCase via DashboardOptions.GroupToRole". Documentation-only.

Server-041 (Design adherence): EventStreamService.ProduceEventsAsync
wraps the broadcaster.Publish call in try/catch (Exception). The
producer loop and gRPC stream are no longer at the mercy of the
broadcaster's never-throw discipline. New regression test
StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession.

Server-042 (Performance): DashboardSnapshotPublisher.ExecuteAsync now
mirrors AlarmsHubPublisher's reconnect loop — wraps the await foreach
in a while-not-cancelled, catches general exceptions, and Task.Delays
5s before retrying. An internal ctor accepts a shorter delay for the
test. New test file DashboardSnapshotPublisherTests.cs covers the
throw-then-yield reconnect path and the normal-completion case.

Server-043 (Documentation): HubTokenService class XML doc gains a
<remarks> describing the singleton lifetime, the two consumer scopes
(DashboardHubConnectionFactory scoped, HubTokenAuthenticationHandler
transient), and the thread-safety contract.

Verification: dotnet build src/ZB.MOM.WW.MxGateway.slnx clean
(0 warnings / 0 errors); src/ZB.MOM.WW.MxGateway.Tests 486/486 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 03:18:52 -04:00
parent d2d2e5f68f
commit 327e9c5f94
9 changed files with 567 additions and 43 deletions
@@ -154,6 +154,13 @@ public sealed class DashboardAuthenticator(
foreach (string group in groups)
{
string normalizedGroup = group.Trim();
// Lookup precedence (Server-040): the full literal group string is
// tried first; only if that misses do we fall back to the leading
// RDN value (e.g. "GwAdmin" extracted from
// "ou=GwAdmin,ou=groups,..."). The map's comparer is
// OrdinalIgnoreCase (see DashboardOptions.GroupToRole), so e.g.
// "GwAdmin" and "gwadmin" both match.
if (groupToRole.TryGetValue(normalizedGroup, out string? mapped)
|| groupToRole.TryGetValue(ExtractFirstRdnValue(normalizedGroup), out mapped))
{
@@ -11,6 +11,18 @@ namespace ZB.MOM.WW.MxGateway.Server.Dashboard;
/// role claims. Validity is enforced by the data-protection time-limited
/// protector; no separate signing keys are configured.
/// </summary>
/// <remarks>
/// Server-043: this service is registered as a singleton in
/// <see cref="DashboardServiceCollectionExtensions.AddGatewayDashboard"/> and
/// is shared by two consumer scopes: <c>DashboardHubConnectionFactory</c>
/// (scoped, per-circuit; calls <see cref="Issue"/> from the cookie-authenticated
/// dashboard) and <c>HubTokenAuthenticationHandler</c> (transient, per-request;
/// calls <see cref="Validate"/> from the SignalR negotiate / connection path).
/// The underlying <see cref="ITimeLimitedDataProtector"/> is thread-safe, so
/// minting and validating concurrently from any number of callers is safe;
/// future maintainers should preserve the singleton lifetime to keep the
/// protector instance stable.
/// </remarks>
public sealed class HubTokenService
{
private const string ProtectorPurpose = "ZB.MOM.WW.MxGateway.Dashboard.HubToken.v1";
@@ -51,6 +63,16 @@ public sealed class HubTokenService
return null;
}
// Server-039: reject a token whose payload carries no caller
// identity. A null/empty Name AND NameIdentifier would otherwise
// produce a principal that satisfies IsAuthenticated and IsInRole
// checks without any associated user, because the AuthenticationType
// (the HubToken scheme) is non-empty.
if (string.IsNullOrEmpty(payload.Name) && string.IsNullOrEmpty(payload.NameIdentifier))
{
return null;
}
List<Claim> claims = [];
if (!string.IsNullOrEmpty(payload.Name))
{
@@ -8,34 +8,94 @@ namespace ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
/// <see cref="DashboardSnapshotHub"/> client. There is one publisher per
/// gateway process; clients listen via the hub.
/// </summary>
public sealed class DashboardSnapshotPublisher(
IDashboardSnapshotService snapshotService,
IHubContext<DashboardSnapshotHub> hubContext,
ILogger<DashboardSnapshotPublisher> logger) : BackgroundService
/// <remarks>
/// Server-042: <see cref="ExecuteAsync"/> wraps the snapshot subscription in
/// a reconnect loop with a configurable retry delay (5s by default,
/// mirroring <see cref="AlarmsHubPublisher"/>). A transient failure inside
/// <see cref="IDashboardSnapshotService.WatchSnapshotsAsync"/> — e.g. a
/// one-time logger-init failure or a transient SQL error from the Galaxy
/// summary projection — would otherwise end the BackgroundService with no
/// reconnect, taking the dashboard offline until process restart.
/// </remarks>
public sealed class DashboardSnapshotPublisher : BackgroundService
{
private static readonly TimeSpan DefaultReconnectDelay = TimeSpan.FromSeconds(5);
private readonly IDashboardSnapshotService _snapshotService;
private readonly IHubContext<DashboardSnapshotHub> _hubContext;
private readonly ILogger<DashboardSnapshotPublisher> _logger;
private readonly TimeSpan _reconnectDelay;
public DashboardSnapshotPublisher(
IDashboardSnapshotService snapshotService,
IHubContext<DashboardSnapshotHub> hubContext,
ILogger<DashboardSnapshotPublisher> logger)
: this(snapshotService, hubContext, logger, DefaultReconnectDelay)
{
}
// Internal hook for the Server-042 regression test: tests inject a
// very short reconnect delay so the assertion doesn't wait the full
// 5s. Production wiring always uses the 5s default via the public ctor.
internal DashboardSnapshotPublisher(
IDashboardSnapshotService snapshotService,
IHubContext<DashboardSnapshotHub> hubContext,
ILogger<DashboardSnapshotPublisher> logger,
TimeSpan reconnectDelay)
{
_snapshotService = snapshotService;
_hubContext = hubContext;
_logger = logger;
_reconnectDelay = reconnectDelay;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
// Loop forever — when WatchSnapshotsAsync completes or throws, reopen
// the subscription after a short delay. The hosted-service lifetime
// ends only when the host stops. Mirrors AlarmsHubPublisher.
while (!stoppingToken.IsCancellationRequested)
{
await foreach (DashboardSnapshot snapshot in snapshotService
.WatchSnapshotsAsync(stoppingToken)
.ConfigureAwait(false))
try
{
await foreach (DashboardSnapshot snapshot in _snapshotService
.WatchSnapshotsAsync(stoppingToken)
.ConfigureAwait(false))
{
if (stoppingToken.IsCancellationRequested)
{
break;
}
try
{
await _hubContext.Clients
.All
.SendAsync(DashboardSnapshotHub.SnapshotMessage, snapshot, stoppingToken)
.ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "Snapshot broadcast failed; will retry on the next snapshot tick.");
}
}
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Snapshot subscription faulted; reconnecting in {DelaySeconds:F1}s.", _reconnectDelay.TotalSeconds);
try
{
await hubContext.Clients
.All
.SendAsync(DashboardSnapshotHub.SnapshotMessage, snapshot, stoppingToken)
.ConfigureAwait(false);
await Task.Delay(_reconnectDelay, stoppingToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
catch (OperationCanceledException)
{
logger.LogWarning(ex, "Snapshot broadcast failed; will retry on the next snapshot tick.");
return;
}
}
}
catch (OperationCanceledException)
{
}
}
}
@@ -23,6 +23,29 @@ public sealed class EventsHub : Hub
public static string GroupName(string sessionId) => $"session:{sessionId}";
/// <summary>
/// Subscribes the calling SignalR connection to the per-session events
/// group, so that events broadcast by
/// <see cref="DashboardEventBroadcaster"/> for that session reach this
/// client.
/// </summary>
/// <remarks>
/// Server-038: in v1 the hub-level <see cref="AuthorizeAttribute"/>
/// (<c>HubClientsPolicy</c>) only checks that the caller carries one of
/// the dashboard roles (Admin or Viewer); both roles may subscribe to
/// any session id they choose. This is acceptable today because (a) the
/// dashboard's per-session views show non-secret session metadata that
/// any authenticated dashboard user can already see, and (b) value
/// logging in the source gRPC stream is gated by the same redaction
/// policy that protects logs. The per-session ACL that gates the gRPC
/// <c>StreamEvents</c> RPC is intentionally not yet mirrored here.
/// TODO(per-session-acl): once a role/scope is introduced that scopes a
/// Viewer to a specific session or tenant, add a session-access check
/// at this seam — either inline (consult the per-user allowed-session
/// set on <c>Context.User</c> claims / <c>Context.Items</c>) or via a
/// dedicated authorization policy applied to the hub method itself.
/// </remarks>
/// <param name="sessionId">Session id to subscribe the caller to.</param>
public Task SubscribeSession(string sessionId)
{
if (string.IsNullOrWhiteSpace(sessionId))
@@ -122,8 +122,23 @@ public sealed class EventStreamService(
// Mirror the event to the dashboard EventsHub group for this
// session. Fire-and-forget — broadcast errors must not affect
// the source gRPC stream.
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
// the source gRPC stream. Server-041: the
// IDashboardEventBroadcaster contract documents Publish as
// never-throw, but we enforce that at the seam too, so a
// future implementation that adds synchronous validation or
// a serializer hop cannot fault the producer loop and end
// this client's gRPC stream.
try
{
dashboardEventBroadcaster.Publish(session.SessionId, publicEvent);
}
catch (Exception ex)
{
logger.LogDebug(
ex,
"Dashboard event mirror threw for session {SessionId}; continuing.",
session.SessionId);
}
if (!writer.TryWrite(publicEvent))
{
@@ -0,0 +1,206 @@
using System.Runtime.CompilerServices;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardSnapshotPublisherTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
/// <summary>
/// Server-042 regression: a transient failure inside
/// <see cref="IDashboardSnapshotService.WatchSnapshotsAsync"/> must not
/// end the BackgroundService; the publisher must wait the configured
/// reconnect delay and then re-open the subscription. Before the fix,
/// the publisher exited on the first non-cancellation exception and
/// the dashboard's snapshot stream went silent until process restart.
/// </summary>
[Fact]
public async Task ExecuteAsync_WhenSnapshotServiceThrowsOnce_ReconnectsAfterDelay()
{
ThrowOnceThenYieldSnapshotService snapshotService = new();
RecordingHubContext hubContext = new();
TimeSpan reconnectDelay = TimeSpan.FromMilliseconds(50);
DashboardSnapshotPublisher publisher = new(
snapshotService,
hubContext,
NullLogger<DashboardSnapshotPublisher>.Instance,
reconnectDelay);
using CancellationTokenSource cts = new();
DateTimeOffset startedAt = DateTimeOffset.UtcNow;
Task execute = publisher.StartAsync(cts.Token);
await execute.WaitAsync(TestTimeout);
// The publisher's first WatchSnapshotsAsync call throws; the second
// call yields one snapshot. We block here until the publisher has
// made the second subscribe attempt AND broadcast its first
// snapshot — proving the publisher did NOT exit on the throw.
await WaitUntilAsync(() => snapshotService.SubscribeCount >= 2);
await WaitUntilAsync(() => hubContext.SendCount >= 1);
DateTimeOffset secondSubscribeAt = snapshotService.SecondSubscribeAt
?? throw new InvalidOperationException("Second subscribe did not record a timestamp.");
await cts.CancelAsync();
await publisher.StopAsync(CancellationToken.None);
Assert.True(snapshotService.SubscribeCount >= 2,
$"Expected at least 2 subscribe calls, got {snapshotService.SubscribeCount}.");
Assert.True(hubContext.SendCount >= 1);
// The gap between the throw (first subscribe) and the reconnect
// (second subscribe) is bounded below by the reconnect delay. We
// give a small slack (10ms) for scheduling jitter on slow CI VMs.
TimeSpan gap = secondSubscribeAt - startedAt;
Assert.True(gap >= reconnectDelay - TimeSpan.FromMilliseconds(10),
$"Expected reconnect gap >= {reconnectDelay.TotalMilliseconds}ms; got {gap.TotalMilliseconds}ms.");
}
/// <summary>
/// Sanity: a normal completion of WatchSnapshotsAsync (no exception)
/// also reconnects after the delay — exits only on host shutdown.
/// </summary>
[Fact]
public async Task ExecuteAsync_WhenSnapshotServiceCompletes_ReconnectsAfterDelay()
{
CompleteImmediatelySnapshotService snapshotService = new();
RecordingHubContext hubContext = new();
TimeSpan reconnectDelay = TimeSpan.FromMilliseconds(50);
DashboardSnapshotPublisher publisher = new(
snapshotService,
hubContext,
NullLogger<DashboardSnapshotPublisher>.Instance,
reconnectDelay);
using CancellationTokenSource cts = new();
Task execute = publisher.StartAsync(cts.Token);
await execute.WaitAsync(TestTimeout);
await WaitUntilAsync(() => snapshotService.SubscribeCount >= 2);
await cts.CancelAsync();
await publisher.StopAsync(CancellationToken.None);
Assert.True(snapshotService.SubscribeCount >= 2);
}
private static async Task WaitUntilAsync(Func<bool> predicate)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(5), cancellationTokenSource.Token);
}
}
private sealed class ThrowOnceThenYieldSnapshotService : IDashboardSnapshotService
{
public int SubscribeCount { get; private set; }
public DateTimeOffset? SecondSubscribeAt { get; private set; }
public DashboardSnapshot GetSnapshot()
{
return null!;
}
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
SubscribeCount++;
int call = SubscribeCount;
if (call == 1)
{
// First call: throw after a brief yield so the publisher
// observes us as a live producer that failed.
await Task.Yield();
throw new InvalidOperationException("simulated transient snapshot failure");
}
SecondSubscribeAt = DateTimeOffset.UtcNow;
yield return GetSnapshot();
// Stay open until cancelled so the publisher's inner await
// foreach doesn't immediately re-loop.
try
{
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
}
private sealed class CompleteImmediatelySnapshotService : IDashboardSnapshotService
{
public int SubscribeCount { get; private set; }
public DashboardSnapshot GetSnapshot()
{
return null!;
}
#pragma warning disable CS1998 // async without await — IAsyncEnumerable contract requires async signature
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
#pragma warning restore CS1998
{
SubscribeCount++;
// Yield nothing and complete immediately — simulates a transient
// upstream disconnect that completes cleanly.
yield break;
}
}
private sealed class RecordingHubContext : IHubContext<DashboardSnapshotHub>
{
private readonly RecordingHubClients _clients = new();
public IHubClients Clients => _clients;
public IGroupManager Groups { get; } = new NoopGroupManager();
public int SendCount => _clients.AllProxy.SendCount;
}
private sealed class RecordingHubClients : IHubClients
{
public RecordingClientProxy AllProxy { get; } = new();
public IClientProxy All => AllProxy;
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds) => AllProxy;
public IClientProxy Client(string connectionId) => AllProxy;
public IClientProxy Clients(IReadOnlyList<string> connectionIds) => AllProxy;
public IClientProxy Group(string groupName) => AllProxy;
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds) => AllProxy;
public IClientProxy Groups(IReadOnlyList<string> groupNames) => AllProxy;
public IClientProxy User(string userId) => AllProxy;
public IClientProxy Users(IReadOnlyList<string> userIds) => AllProxy;
}
private sealed class RecordingClientProxy : IClientProxy
{
private int _sendCount;
public int SendCount => Volatile.Read(ref _sendCount);
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref _sendCount);
return Task.CompletedTask;
}
}
private sealed class NoopGroupManager : IGroupManager
{
public Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
=> Task.CompletedTask;
public Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
=> Task.CompletedTask;
}
}
@@ -0,0 +1,115 @@
using System.Security.Claims;
using Microsoft.AspNetCore.DataProtection;
using ZB.MOM.WW.MxGateway.Server.Dashboard;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Dashboard;
public sealed class HubTokenServiceTests
{
/// <summary>
/// Server-039: a token whose data-protected payload has both
/// <c>Name</c> and <c>NameIdentifier</c> null (the principal that
/// minted the token had no identity claims) must be rejected by
/// <see cref="HubTokenService.Validate"/>. The role claims alone are
/// not enough — without a caller identity, the resulting
/// <see cref="ClaimsPrincipal"/> would satisfy
/// <c>IsAuthenticated</c> / <c>IsInRole</c> checks without an
/// associated user.
/// </summary>
[Fact]
public void Validate_TokenWithNullNameAndNullNameIdentifier_ReturnsNull()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
// Issue from a principal with NO Name claim and NO NameIdentifier
// claim. The Issue method's payload will then carry
// (Name = null, NameIdentifier = null, Roles = ["Viewer"]).
ClaimsIdentity identity = new(
[new Claim(ClaimTypes.Role, DashboardRoles.Viewer)],
authenticationType: "test");
ClaimsPrincipal principal = new(identity);
string token = service.Issue(principal);
ClaimsPrincipal? result = service.Validate(token);
Assert.Null(result);
}
/// <summary>
/// Sanity check: a token minted from a principal with a Name claim
/// validates and returns a principal carrying that identity. Pins
/// that the Server-039 fix does not over-reject valid tokens.
/// </summary>
[Fact]
public void Validate_TokenWithName_ReturnsAuthenticatedPrincipal()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
ClaimsIdentity identity = new(
[
new Claim(ClaimTypes.Name, "alice"),
new Claim(ClaimTypes.NameIdentifier, "alice-id"),
new Claim(ClaimTypes.Role, DashboardRoles.Admin),
],
authenticationType: "test",
nameType: ClaimTypes.Name,
roleType: ClaimTypes.Role);
ClaimsPrincipal principal = new(identity);
string token = service.Issue(principal);
ClaimsPrincipal? result = service.Validate(token);
Assert.NotNull(result);
Assert.Equal("alice", result.Identity?.Name);
Assert.True(result.IsInRole(DashboardRoles.Admin));
}
/// <summary>
/// Sanity check: a token minted with only a NameIdentifier (no Name)
/// still validates — a non-null caller identity is the contract,
/// either field is sufficient.
/// </summary>
[Fact]
public void Validate_TokenWithOnlyNameIdentifier_ReturnsPrincipal()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
ClaimsIdentity identity = new(
[
new Claim(ClaimTypes.NameIdentifier, "alice-id"),
new Claim(ClaimTypes.Role, DashboardRoles.Viewer),
],
authenticationType: "test");
ClaimsPrincipal principal = new(identity);
string token = service.Issue(principal);
ClaimsPrincipal? result = service.Validate(token);
Assert.NotNull(result);
Assert.True(result.IsInRole(DashboardRoles.Viewer));
}
[Fact]
public void Validate_NullToken_ReturnsNull()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
Assert.Null(service.Validate(null));
}
[Fact]
public void Validate_EmptyToken_ReturnsNull()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
Assert.Null(service.Validate(string.Empty));
}
[Fact]
public void Validate_GarbageToken_ReturnsNull()
{
HubTokenService service = new(new EphemeralDataProtectionProvider());
Assert.Null(service.Validate("this-is-not-a-protected-payload"));
}
}
@@ -9,6 +9,7 @@ using ZB.MOM.WW.MxGateway.Server.Grpc;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Grpc;
@@ -260,11 +261,81 @@ public sealed class EventStreamServiceTests
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
/// <summary>
/// Tests-026 regression: <see cref="EventStreamService.StreamEventsAsync"/>
/// must mirror every yielded event to the
/// <see cref="ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster"/>
/// seam (the only path that fans events out to dashboard SignalR clients).
/// A regression that silently dropped the <c>Publish</c> call — e.g. an
/// <c>if</c> accidentally added around it, or the broadcaster ctor
/// parameter being removed — would have produced no failing test before
/// this fixture existed. The recording fake captures every call and we
/// assert one publish per yielded event, with the correct session id and
/// preserved <c>WorkerSequence</c>.
/// </summary>
[Fact]
public async Task StreamEventsAsync_PublishesEachEventToDashboardBroadcaster()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
RecordingDashboardEventBroadcaster recordingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
dashboardEventBroadcaster: recordingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 7, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 8, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([7UL, 8UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
IReadOnlyList<DashboardEventCapture> captures = recordingBroadcaster.Captures;
Assert.Equal(2, captures.Count);
Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId));
Assert.Equal([7UL, 8UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray());
Assert.Equal(MxEventFamily.OnDataChange, captures[0].MxEvent.Family);
Assert.Equal(MxEventFamily.OnWriteComplete, captures[1].MxEvent.Family);
}
/// <summary>
/// Server-041 regression: <see cref="EventStreamService"/> must not
/// abort the gRPC stream when the dashboard broadcaster throws.
/// <c>IDashboardEventBroadcaster.Publish</c> is documented as
/// best-effort and never-throw, but the gRPC consumer cannot rely on
/// implementation discipline alone — the seam itself swallows the
/// fault and logs at debug, mirroring the broadcaster's own
/// continuation handler. Without the wrap, the producer loop would
/// surface the exception and the client would see a faulted stream
/// for a dashboard-mirror failure.
/// </summary>
[Fact]
public async Task StreamEventsAsync_WhenDashboardBroadcasterThrows_StillYieldsEventsAndDoesNotFaultSession()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
ThrowingDashboardEventBroadcaster throwingBroadcaster = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
dashboardEventBroadcaster: throwingBroadcaster);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([1UL, 2UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal(2, throwingBroadcaster.PublishAttempts);
Assert.NotEqual(SessionState.Faulted, session.State);
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
int queueCapacity = 8,
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast)
EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast,
ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster? dashboardEventBroadcaster = null)
{
return new EventStreamService(
sessionManager,
@@ -278,14 +349,19 @@ public sealed class EventStreamServiceTests
}),
new MxAccessGrpcMapper(),
metrics ?? new GatewayMetrics(),
NullDashboardEventBroadcaster.Instance,
dashboardEventBroadcaster ?? NullDashboardEventBroadcaster.Instance,
NullLogger<EventStreamService>.Instance);
}
private sealed class NullDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
private sealed class ThrowingDashboardEventBroadcaster : ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs.IDashboardEventBroadcaster
{
public static readonly NullDashboardEventBroadcaster Instance = new();
public void Publish(string sessionId, MxEvent mxEvent) { }
public int PublishAttempts { get; private set; }
public void Publish(string sessionId, MxEvent mxEvent)
{
PublishAttempts++;
throw new InvalidOperationException("simulated dashboard broadcaster failure");
}
}
private static async Task<List<MxEvent>> CollectEventsAsync(