using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Dashboard.Hubs; using ZB.MOM.WW.MxGateway.Server.Grpc; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; using ZB.MOM.WW.MxGateway.Tests.TestSupport; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Sessions; /// /// Task 6 regression tests for the internal dashboard mirror. The dashboard is a /// first-class subscriber on the session's , so it /// receives session events whether or not a gRPC client is streaming — fixing the /// "dark feed" where the dashboard only saw events while a gRPC client was actively /// streaming (the inline per-RPC tap removed by this task). /// public sealed class GatewaySessionDashboardMirrorTests { private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); /// /// The KEY bug-fix test: the dashboard broadcaster receives session events even when /// NO gRPC StreamEvents subscriber is attached. The session is driven to Ready /// with a fake worker emitting events; only the internal dashboard subscriber exists. /// Before Task 6 the mirror lived inside the per-RPC gRPC loop, so with no gRPC /// subscriber the dashboard saw nothing. /// [Fact] public async Task DashboardMirror_ReceivesEvents_WithNoGrpcSubscriber() { FakeWorkerClient workerClient = new(); workerClient.Events.Add(CreateWorkerEvent(10, MxEventFamily.OnDataChange)); workerClient.Events.Add(CreateWorkerEvent(11, MxEventFamily.OnWriteComplete)); workerClient.CompleteAfterConfiguredEvents = true; RecordingDashboardEventBroadcaster broadcaster = new(); await using GatewaySession session = CreateSession(workerClient, broadcaster); session.AttachWorkerClient(workerClient); // MarkReady starts the internal dashboard mirror; no gRPC subscriber is ever attached. session.MarkReady(); await WaitUntilAsync(() => broadcaster.Captures.Count == 2); IReadOnlyList captures = broadcaster.Captures; Assert.Equal(0, session.ActiveEventSubscriberCount); Assert.Equal([10UL, 11UL], captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); Assert.All(captures, capture => Assert.Equal(session.SessionId, capture.SessionId)); } /// /// A gRPC subscriber and the dashboard both receive every event concurrently. The /// gRPC path is no longer the dashboard's source — both read independent leases fed by /// the single distributor pump. /// [Fact] public async Task DashboardMirror_AndGrpcSubscriber_BothReceiveEvents() { FakeWorkerClient workerClient = new(); workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); workerClient.Events.Add(CreateWorkerEvent(3, MxEventFamily.OnWriteComplete)); workerClient.CompleteAfterConfiguredEvents = true; RecordingDashboardEventBroadcaster broadcaster = new(); await using GatewaySession session = CreateSession(workerClient, broadcaster); session.AttachWorkerClient(workerClient); session.MarkReady(); EventStreamService service = new( new SingleSessionManager(session), Options.Create(new GatewayOptions { Events = new EventOptions { QueueCapacity = 8 } }), new GatewayMetrics()); List grpcEvents = []; await foreach (MxEvent mxEvent in service .StreamEventsAsync(new StreamEventsRequest { SessionId = session.SessionId }, CancellationToken.None) .WithCancellation(CancellationToken.None)) { grpcEvents.Add(mxEvent); } await WaitUntilAsync(() => broadcaster.Captures.Count == 3); Assert.Equal([1UL, 2UL, 3UL], grpcEvents.Select(mxEvent => mxEvent.WorkerSequence).ToArray()); Assert.Equal([1UL, 2UL, 3UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); } /// /// Task 4 hazard guard: starting the pump at Ready with a fast-completing worker stream /// and zero subscribers used to drain into nothing and leave a later subscriber hanging. /// Now the dashboard subscriber is registered BEFORE the pump starts, so even a worker /// stream that completes immediately delivers every event to the dashboard with no hang. /// [Fact] public async Task DashboardMirror_FastCompletingWorkerStream_DeliversAllEventsWithoutHang() { FakeWorkerClient workerClient = new(); workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); workerClient.CompleteAfterConfiguredEvents = true; RecordingDashboardEventBroadcaster broadcaster = new(); await using GatewaySession session = CreateSession(workerClient, broadcaster); session.AttachWorkerClient(workerClient); session.MarkReady(); await WaitUntilAsync(() => broadcaster.Captures.Count == 2); Assert.Equal([1UL, 2UL], broadcaster.Captures.Select(capture => capture.MxEvent.WorkerSequence).ToArray()); } /// /// The dashboard Publish must be never-throw at the seam too: a throwing broadcaster /// must not fault the session or stop the mirror from continuing past the failure. /// [Fact] public async Task DashboardMirror_WhenBroadcasterThrows_DoesNotFaultSessionAndKeepsMirroring() { FakeWorkerClient workerClient = new(); workerClient.Events.Add(CreateWorkerEvent(1, MxEventFamily.OnDataChange)); workerClient.Events.Add(CreateWorkerEvent(2, MxEventFamily.OnDataChange)); workerClient.CompleteAfterConfiguredEvents = true; ThrowingDashboardEventBroadcaster broadcaster = new(); await using GatewaySession session = CreateSession(workerClient, broadcaster); session.AttachWorkerClient(workerClient); session.MarkReady(); await WaitUntilAsync(() => broadcaster.PublishAttempts == 2); Assert.NotEqual(SessionState.Faulted, session.State); } /// /// The internal dashboard subscriber must NOT count against the single-subscriber /// guard: a gRPC subscriber can still attach while the dashboard mirror is running. /// [Fact] public async Task DashboardMirror_DoesNotCountAgainstSingleSubscriberGuard() { FakeWorkerClient workerClient = new(); RecordingDashboardEventBroadcaster broadcaster = new(); await using GatewaySession session = CreateSession(workerClient, broadcaster); session.AttachWorkerClient(workerClient); session.MarkReady(); Assert.Equal(0, session.ActiveEventSubscriberCount); using IEventSubscriberLease lease = session.AttachEventSubscriber(maxSubscribers: 1); Assert.Equal(1, session.ActiveEventSubscriberCount); } private static GatewaySession CreateSession( IWorkerClient workerClient, IDashboardEventBroadcaster broadcaster) { return new GatewaySession( sessionId: "session-dashboard-mirror", backendName: GatewayContractInfo.DefaultBackendName, pipeName: "mxaccess-gateway-1-session-dashboard-mirror", nonce: "nonce", clientIdentity: "client-1", ownerKeyId: null, clientSessionName: "test-session", clientCorrelationId: "client-correlation-1", commandTimeout: TimeSpan.FromSeconds(5), startupTimeout: TimeSpan.FromSeconds(5), shutdownTimeout: TimeSpan.FromSeconds(5), leaseDuration: TimeSpan.FromMinutes(30), openedAt: DateTimeOffset.UtcNow, eventStreaming: new SessionEventStreaming( new MxAccessGrpcMapper(), new EventOptions { QueueCapacity = 8 }, NullLogger.Instance, TimeProvider.System, new GatewayMetrics(), broadcaster)); } private static WorkerEvent CreateWorkerEvent(ulong sequence, MxEventFamily family) { MxEvent mxEvent = new() { SessionId = "session-dashboard-mirror", Family = family, WorkerSequence = sequence, }; switch (family) { case MxEventFamily.OnDataChange: mxEvent.OnDataChange = new OnDataChangeEvent(); break; case MxEventFamily.OnWriteComplete: mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); break; } return new WorkerEvent { Event = mxEvent }; } private static async Task WaitUntilAsync(Func predicate, [CallerArgumentExpression(nameof(predicate))] string? condition = null) { using CancellationTokenSource cancellationTokenSource = new(TestTimeout); try { while (!predicate()) { await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); } } catch (OperationCanceledException) { Assert.Fail($"Timed out after {TestTimeout.TotalSeconds}s waiting for: {condition}"); } } private sealed class ThrowingDashboardEventBroadcaster : IDashboardEventBroadcaster { private int _publishAttempts; public int PublishAttempts => Volatile.Read(ref _publishAttempts); public void Publish(string sessionId, MxEvent mxEvent) { Interlocked.Increment(ref _publishAttempts); throw new InvalidOperationException("simulated dashboard broadcaster failure"); } } private sealed class SingleSessionManager(GatewaySession session) : ISessionManager { public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, string? ownerKeyId, CancellationToken cancellationToken) => Task.FromResult(session); public bool TryGetSession(string sessionId, out GatewaySession gatewaySession) { gatewaySession = session; return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal); } public Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply()); public IAsyncEnumerable ReadEventsAsync( string sessionId, CancellationToken cancellationToken) => session.ReadEventsAsync(cancellationToken); public Task CloseSessionAsync( string sessionId, CancellationToken cancellationToken) => Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); public Task KillWorkerAsync( string sessionId, string reason, CancellationToken cancellationToken) => Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); public Task CloseExpiredLeasesAsync( DateTimeOffset now, CancellationToken cancellationToken) => Task.FromResult(0); public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; } private sealed class FakeWorkerClient : IWorkerClient { public List Events { get; } = []; public bool CompleteAfterConfiguredEvents { get; set; } public string SessionId { get; } = "session-dashboard-mirror"; public int? ProcessId { get; } = 1234; public WorkerClientState State { get; } = WorkerClientState.Ready; public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; public Task InvokeAsync( WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) => Task.FromResult(new WorkerCommandReply()); public async IAsyncEnumerable ReadEventsAsync( [EnumeratorCancellation] CancellationToken cancellationToken) { foreach (WorkerEvent workerEvent in Events) { cancellationToken.ThrowIfCancellationRequested(); yield return workerEvent; } if (CompleteAfterConfiguredEvents) { yield break; } await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken); } public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask; public void Kill(string reason) { } public ValueTask DisposeAsync() => ValueTask.CompletedTask; } }