From 3661420f0afa7a058cfed5ff89471f356d47d8bb Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 17:49:41 -0400 Subject: [PATCH] Issue #15: implement dashboard snapshot service --- gateway.md | 2 + .../Dashboard/DashboardFaultSummary.cs | 9 + .../Dashboard/DashboardMetricSummary.cs | 6 + .../Dashboard/DashboardRedactor.cs | 34 ++ .../DashboardServiceCollectionExtensions.cs | 11 + .../Dashboard/DashboardSessionSummary.cs | 19 ++ .../Dashboard/DashboardSnapshot.cs | 15 + .../Dashboard/DashboardSnapshotService.cs | 196 ++++++++++++ .../Dashboard/DashboardWorkerSummary.cs | 10 + .../Dashboard/IDashboardSnapshotService.cs | 8 + src/MxGateway.Server/GatewayApplication.cs | 2 + .../DashboardSnapshotServiceTests.cs | 290 ++++++++++++++++++ 12 files changed, 602 insertions(+) create mode 100644 src/MxGateway.Server/Dashboard/DashboardFaultSummary.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardMetricSummary.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardRedactor.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardSnapshot.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs create mode 100644 src/MxGateway.Server/Dashboard/DashboardWorkerSummary.cs create mode 100644 src/MxGateway.Server/Dashboard/IDashboardSnapshotService.cs create mode 100644 src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs diff --git a/gateway.md b/gateway.md index 06fe919..2dfcbc2 100644 --- a/gateway.md +++ b/gateway.md @@ -107,6 +107,8 @@ worker, correlation, command, and client identity fields with redaction applied before values enter log state. `GatewayMetrics` exposes counters, gauges, and histograms through .NET `Meter` and a snapshot API that dashboard services can project without binding to a metrics exporter. +`DashboardSnapshotService` projects sessions, workers, metrics, faults, and +effective configuration into immutable DTOs for read-only dashboard rendering. ### Worker Process diff --git a/src/MxGateway.Server/Dashboard/DashboardFaultSummary.cs b/src/MxGateway.Server/Dashboard/DashboardFaultSummary.cs new file mode 100644 index 0000000..67285f4 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardFaultSummary.cs @@ -0,0 +1,9 @@ +namespace MxGateway.Server.Dashboard; + +public sealed record DashboardFaultSummary( + string Source, + string? SessionId, + int? WorkerProcessId, + string State, + string Message, + DateTimeOffset ObservedAt); diff --git a/src/MxGateway.Server/Dashboard/DashboardMetricSummary.cs b/src/MxGateway.Server/Dashboard/DashboardMetricSummary.cs new file mode 100644 index 0000000..8b256a1 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardMetricSummary.cs @@ -0,0 +1,6 @@ +namespace MxGateway.Server.Dashboard; + +public sealed record DashboardMetricSummary( + string Name, + long Value, + string? Dimension = null); diff --git a/src/MxGateway.Server/Dashboard/DashboardRedactor.cs b/src/MxGateway.Server/Dashboard/DashboardRedactor.cs new file mode 100644 index 0000000..b46556f --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardRedactor.cs @@ -0,0 +1,34 @@ +using MxGateway.Server.Diagnostics; + +namespace MxGateway.Server.Dashboard; + +internal static class DashboardRedactor +{ + private static readonly string[] SensitiveTextMarkers = + [ + "apikey", + "api_key", + "authorization", + "credential", + "password", + "secret", + "token", + ]; + + public static string? Redact(string? value) + { + if (string.IsNullOrWhiteSpace(value)) + { + return value; + } + + if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase)) + { + return GatewayLogRedactor.RedactClientIdentity(value); + } + + return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase)) + ? GatewayLogRedactor.RedactedValue + : value; + } +} diff --git a/src/MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs b/src/MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs new file mode 100644 index 0000000..cfbaa90 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardServiceCollectionExtensions.cs @@ -0,0 +1,11 @@ +namespace MxGateway.Server.Dashboard; + +public static class DashboardServiceCollectionExtensions +{ + public static IServiceCollection AddGatewayDashboard(this IServiceCollection services) + { + services.AddSingleton(); + + return services; + } +} diff --git a/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs b/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs new file mode 100644 index 0000000..cbebae8 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs @@ -0,0 +1,19 @@ +using MxGateway.Contracts.Proto; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Dashboard; + +public sealed record DashboardSessionSummary( + string SessionId, + string BackendName, + SessionState State, + string? ClientIdentity, + string? ClientSessionName, + string? ClientCorrelationId, + DateTimeOffset OpenedAt, + DateTimeOffset LastClientActivityAt, + DateTimeOffset? LeaseExpiresAt, + int? WorkerProcessId, + WorkerClientState? WorkerState, + DateTimeOffset? LastWorkerHeartbeatAt, + string? LastFault); diff --git a/src/MxGateway.Server/Dashboard/DashboardSnapshot.cs b/src/MxGateway.Server/Dashboard/DashboardSnapshot.cs new file mode 100644 index 0000000..00aa0e0 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardSnapshot.cs @@ -0,0 +1,15 @@ +using MxGateway.Server.Configuration; + +namespace MxGateway.Server.Dashboard; + +public sealed record DashboardSnapshot( + DateTimeOffset GeneratedAt, + DateTimeOffset GatewayStartedAt, + TimeSpan GatewayUptime, + string GatewayStatus, + string GatewayVersion, + IReadOnlyList Sessions, + IReadOnlyList Workers, + IReadOnlyList Metrics, + IReadOnlyList Faults, + EffectiveGatewayConfiguration Configuration); diff --git a/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs b/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs new file mode 100644 index 0000000..fb602ac --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs @@ -0,0 +1,196 @@ +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Options; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Dashboard; + +public sealed class DashboardSnapshotService : IDashboardSnapshotService +{ + private const string HealthyStatus = "Healthy"; + + private readonly ISessionRegistry _sessionRegistry; + private readonly GatewayMetrics _metrics; + private readonly IGatewayConfigurationProvider _configurationProvider; + private readonly TimeProvider _timeProvider; + private readonly DateTimeOffset _gatewayStartedAt; + private readonly TimeSpan _snapshotInterval; + private readonly int _recentFaultLimit; + private readonly int _recentSessionLimit; + + public DashboardSnapshotService( + ISessionRegistry sessionRegistry, + GatewayMetrics metrics, + IGatewayConfigurationProvider configurationProvider, + IOptions options, + TimeProvider? timeProvider = null) + { + _sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry)); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); + _configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider)); + ArgumentNullException.ThrowIfNull(options); + + _timeProvider = timeProvider ?? TimeProvider.System; + _gatewayStartedAt = _timeProvider.GetUtcNow(); + _snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds); + _recentFaultLimit = options.Value.Dashboard.RecentFaultLimit; + _recentSessionLimit = options.Value.Dashboard.RecentSessionLimit; + } + + public DashboardSnapshot GetSnapshot() + { + DateTimeOffset generatedAt = _timeProvider.GetUtcNow(); + IReadOnlyList sessions = _sessionRegistry.Snapshot() + .OrderByDescending(session => session.OpenedAt) + .ToArray(); + IReadOnlyList sessionSummaries = sessions + .Take(ResolveLimit(_recentSessionLimit)) + .Select(CreateSessionSummary) + .ToArray(); + IReadOnlyList workerSummaries = sessions + .Where(session => session.WorkerClient is not null) + .Select(CreateWorkerSummary) + .ToArray(); + GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot(); + + return new DashboardSnapshot( + GeneratedAt: generatedAt, + GatewayStartedAt: _gatewayStartedAt, + GatewayUptime: generatedAt - _gatewayStartedAt, + GatewayStatus: HealthyStatus, + GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown", + Sessions: sessionSummaries, + Workers: workerSummaries, + Metrics: CreateMetricSummaries(metricsSnapshot), + Faults: CreateFaultSummaries(sessions, generatedAt), + Configuration: _configurationProvider.GetEffectiveConfiguration()); + } + + public async IAsyncEnumerable WatchSnapshotsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + yield break; + } + + yield return GetSnapshot(); + + using PeriodicTimer timer = new(_snapshotInterval, _timeProvider); + while (true) + { + bool hasNext; + try + { + hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + yield break; + } + + if (!hasNext) + { + yield break; + } + + yield return GetSnapshot(); + } + } + + private static DashboardSessionSummary CreateSessionSummary(GatewaySession session) + { + IWorkerClient? workerClient = session.WorkerClient; + + return new DashboardSessionSummary( + SessionId: session.SessionId, + BackendName: session.BackendName, + State: session.State, + ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity), + ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName), + ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId), + OpenedAt: session.OpenedAt, + LastClientActivityAt: session.LastClientActivityAt, + LeaseExpiresAt: session.LeaseExpiresAt, + WorkerProcessId: workerClient?.ProcessId, + WorkerState: workerClient?.State, + LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt, + LastFault: DashboardRedactor.Redact(session.FinalFault)); + } + + private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session) + { + IWorkerClient workerClient = session.WorkerClient!; + + return new DashboardWorkerSummary( + SessionId: session.SessionId, + ProcessId: workerClient.ProcessId, + State: workerClient.State, + LastHeartbeatAt: workerClient.LastHeartbeatAt, + LastFault: DashboardRedactor.Redact(session.FinalFault)); + } + + private static IReadOnlyList CreateMetricSummaries(GatewayMetricsSnapshot snapshot) + { + List metrics = + [ + new("mxgateway.sessions.open", snapshot.OpenSessions), + new("mxgateway.workers.running", snapshot.WorkersRunning), + new("mxgateway.events.queue.depth", snapshot.EventQueueDepth), + new("mxgateway.sessions.opened", snapshot.SessionsOpened), + new("mxgateway.sessions.closed", snapshot.SessionsClosed), + new("mxgateway.commands.started", snapshot.CommandsStarted), + new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded), + new("mxgateway.commands.failed", snapshot.CommandsFailed), + new("mxgateway.events.received", snapshot.EventsReceived), + new("mxgateway.queues.overflows", snapshot.QueueOverflows), + new("mxgateway.faults", snapshot.Faults), + new("mxgateway.workers.killed", snapshot.WorkerKills), + new("mxgateway.workers.exited", snapshot.WorkerExits), + new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures), + new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects), + ]; + + metrics.AddRange(snapshot.CommandFailuresByMethod + .OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase) + .Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key))); + metrics.AddRange(snapshot.EventsByFamily + .OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase) + .Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key))); + + return metrics; + } + + private IReadOnlyList CreateFaultSummaries( + IReadOnlyList sessions, + DateTimeOffset generatedAt) + { + return sessions + .Where(HasFault) + .Take(ResolveLimit(_recentFaultLimit)) + .Select(session => new DashboardFaultSummary( + Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session", + SessionId: session.SessionId, + WorkerProcessId: session.WorkerProcessId, + State: session.WorkerClient?.State == WorkerClientState.Faulted + ? WorkerClientState.Faulted.ToString() + : session.State.ToString(), + Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted", + ObservedAt: generatedAt)) + .ToArray(); + } + + private static bool HasFault(GatewaySession session) + { + return session.State == MxGateway.Contracts.Proto.SessionState.Faulted + || session.WorkerClient?.State == WorkerClientState.Faulted + || !string.IsNullOrWhiteSpace(session.FinalFault); + } + + private static int ResolveLimit(int configuredLimit) + { + return configuredLimit < 0 ? 0 : configuredLimit; + } +} diff --git a/src/MxGateway.Server/Dashboard/DashboardWorkerSummary.cs b/src/MxGateway.Server/Dashboard/DashboardWorkerSummary.cs new file mode 100644 index 0000000..563e010 --- /dev/null +++ b/src/MxGateway.Server/Dashboard/DashboardWorkerSummary.cs @@ -0,0 +1,10 @@ +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Dashboard; + +public sealed record DashboardWorkerSummary( + string SessionId, + int? ProcessId, + WorkerClientState State, + DateTimeOffset LastHeartbeatAt, + string? LastFault); diff --git a/src/MxGateway.Server/Dashboard/IDashboardSnapshotService.cs b/src/MxGateway.Server/Dashboard/IDashboardSnapshotService.cs new file mode 100644 index 0000000..452c78d --- /dev/null +++ b/src/MxGateway.Server/Dashboard/IDashboardSnapshotService.cs @@ -0,0 +1,8 @@ +namespace MxGateway.Server.Dashboard; + +public interface IDashboardSnapshotService +{ + DashboardSnapshot GetSnapshot(); + + IAsyncEnumerable WatchSnapshotsAsync(CancellationToken cancellationToken); +} diff --git a/src/MxGateway.Server/GatewayApplication.cs b/src/MxGateway.Server/GatewayApplication.cs index 1279323..887cdcf 100644 --- a/src/MxGateway.Server/GatewayApplication.cs +++ b/src/MxGateway.Server/GatewayApplication.cs @@ -1,5 +1,6 @@ using MxGateway.Contracts; using MxGateway.Server.Configuration; +using MxGateway.Server.Dashboard; using MxGateway.Server.Diagnostics; using MxGateway.Server.Grpc; using MxGateway.Server.Metrics; @@ -36,6 +37,7 @@ public static class GatewayApplication builder.Services.AddSingleton(); builder.Services.AddWorkerProcessLauncher(); builder.Services.AddGatewaySessions(); + builder.Services.AddGatewayDashboard(); return builder; } diff --git a/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs b/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs new file mode 100644 index 0000000..6c12be0 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs @@ -0,0 +1,290 @@ +using Microsoft.Extensions.Options; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Dashboard; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Dashboard; + +public sealed class DashboardSnapshotServiceTests +{ + [Fact] + public void GetSnapshot_WhenRegistryEmpty_ReturnsEmptyOperationalState() + { + using GatewayMetrics metrics = new(); + DashboardSnapshotService service = CreateService(new SessionRegistry(), metrics); + + DashboardSnapshot snapshot = service.GetSnapshot(); + + Assert.Empty(snapshot.Sessions); + Assert.Empty(snapshot.Workers); + Assert.Empty(snapshot.Faults); + Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.sessions.open" && metric.Value == 0); + Assert.Equal("Healthy", snapshot.GatewayStatus); + Assert.NotNull(snapshot.Configuration); + } + + [Fact] + public void GetSnapshot_ProjectsActiveAndFaultedSessionsWorkersMetricsAndFaults() + { + SessionRegistry registry = new(); + GatewaySession activeSession = CreateSession( + "session-active", + "client-one", + DateTimeOffset.Parse("2026-04-26T10:00:00Z")); + activeSession.AttachWorkerClient(new FakeWorkerClient("session-active", 1201, WorkerClientState.Ready)); + activeSession.MarkReady(); + GatewaySession faultedSession = CreateSession( + "session-faulted", + "client-two", + DateTimeOffset.Parse("2026-04-26T10:01:00Z")); + faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted)); + faultedSession.MarkFaulted("worker pipe disconnected"); + registry.TryAdd(activeSession); + registry.TryAdd(faultedSession); + using GatewayMetrics metrics = new(); + metrics.SessionOpened(); + metrics.SessionOpened(); + metrics.CommandStarted("Register"); + metrics.CommandFailed("Register", "WorkerFaulted", TimeSpan.FromMilliseconds(7)); + metrics.EventReceived("session-active", "OnDataChange"); + metrics.Fault("WorkerFaulted"); + DashboardSnapshotService service = CreateService(registry, metrics); + + DashboardSnapshot snapshot = service.GetSnapshot(); + + Assert.Equal(2, snapshot.Sessions.Count); + Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId); + Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State); + Assert.Equal(2, snapshot.Workers.Count); + Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1); + Assert.Contains( + snapshot.Metrics, + metric => metric.Name == "mxgateway.events.received" + && metric.Dimension == "OnDataChange" + && metric.Value == 1); + DashboardFaultSummary fault = Assert.Single(snapshot.Faults); + Assert.Equal("Worker", fault.Source); + Assert.Equal("session-faulted", fault.SessionId); + Assert.Equal("worker pipe disconnected", fault.Message); + } + + [Fact] + public void GetSnapshot_RedactsSecretsFromSessionAndFaultFields() + { + SessionRegistry registry = new(); + GatewaySession session = CreateSession( + "session-redacted", + "Bearer mxgw_admin_super-secret", + DateTimeOffset.Parse("2026-04-26T10:00:00Z"), + clientSessionName: "password=hunter2", + clientCorrelationId: "token=abc123"); + session.MarkFaulted("secret=credential-value"); + registry.TryAdd(session); + using GatewayMetrics metrics = new(); + DashboardSnapshotService service = CreateService(registry, metrics); + + DashboardSnapshot snapshot = service.GetSnapshot(); + + DashboardSessionSummary summary = Assert.Single(snapshot.Sessions); + Assert.Equal("Bearer mxgw_admin_[redacted]", summary.ClientIdentity); + Assert.Equal("[redacted]", summary.ClientSessionName); + Assert.Equal("[redacted]", summary.ClientCorrelationId); + Assert.Equal("[redacted]", summary.LastFault); + Assert.Equal("[redacted]", Assert.Single(snapshot.Faults).Message); + Assert.Equal("[redacted]", snapshot.Configuration.Authentication.PepperSecretName); + } + + [Fact] + public void GetSnapshot_DoesNotMutateSessionOrWorkerState() + { + SessionRegistry registry = new(); + GatewaySession session = CreateSession( + "session-active", + "client-one", + DateTimeOffset.Parse("2026-04-26T10:00:00Z")); + FakeWorkerClient workerClient = new("session-active", 1201, WorkerClientState.Ready); + session.AttachWorkerClient(workerClient); + session.MarkReady(); + registry.TryAdd(session); + using GatewayMetrics metrics = new(); + DashboardSnapshotService service = CreateService(registry, metrics); + + service.GetSnapshot(); + service.GetSnapshot(); + + Assert.Equal(1, registry.ActiveCount); + Assert.Equal(SessionState.Ready, session.State); + Assert.Equal(WorkerClientState.Ready, workerClient.State); + Assert.Equal(0, workerClient.StartCount); + Assert.Equal(0, workerClient.ShutdownCount); + Assert.Equal(0, workerClient.KillCount); + } + + [Fact] + public void GetSnapshot_AppliesRecentSessionAndFaultLimits() + { + SessionRegistry registry = new(); + GatewaySession olderSession = CreateSession( + "session-older", + "client-one", + DateTimeOffset.Parse("2026-04-26T10:00:00Z")); + GatewaySession newerSession = CreateSession( + "session-newer", + "client-two", + DateTimeOffset.Parse("2026-04-26T10:01:00Z")); + olderSession.MarkFaulted("older fault"); + newerSession.MarkFaulted("newer fault"); + registry.TryAdd(olderSession); + registry.TryAdd(newerSession); + using GatewayMetrics metrics = new(); + DashboardSnapshotService service = CreateService( + registry, + metrics, + new GatewayOptions + { + Dashboard = new DashboardOptions + { + SnapshotIntervalMilliseconds = 1, + RecentSessionLimit = 1, + RecentFaultLimit = 1, + }, + }); + + DashboardSnapshot snapshot = service.GetSnapshot(); + + Assert.Equal("session-newer", Assert.Single(snapshot.Sessions).SessionId); + Assert.Equal("session-newer", Assert.Single(snapshot.Faults).SessionId); + } + + [Fact] + public async Task WatchSnapshotsAsync_WhenSubscriberCancels_DisposesCleanly() + { + using GatewayMetrics metrics = new(); + DashboardSnapshotService service = CreateService( + new SessionRegistry(), + metrics, + new GatewayOptions + { + Dashboard = new DashboardOptions + { + SnapshotIntervalMilliseconds = 1, + }, + }); + using CancellationTokenSource cancellation = new(); + await using IAsyncEnumerator enumerator = service + .WatchSnapshotsAsync(cancellation.Token) + .GetAsyncEnumerator(); + + Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1))); + await cancellation.CancelAsync(); + bool hasNext = await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1)); + + Assert.False(hasNext); + } + + private static DashboardSnapshotService CreateService( + SessionRegistry registry, + GatewayMetrics metrics, + GatewayOptions? options = null) + { + GatewayOptions resolvedOptions = options ?? new GatewayOptions + { + Dashboard = new DashboardOptions + { + SnapshotIntervalMilliseconds = 1, + }, + }; + GatewayConfigurationProvider configurationProvider = new(Options.Create(resolvedOptions)); + + return new DashboardSnapshotService( + registry, + metrics, + configurationProvider, + Options.Create(resolvedOptions)); + } + + private static GatewaySession CreateSession( + string sessionId, + string? clientIdentity, + DateTimeOffset openedAt, + string? clientSessionName = "test-session", + string? clientCorrelationId = "client-correlation") + { + return new GatewaySession( + sessionId, + "mxaccess", + $"mxaccess-gateway-1-{sessionId}", + "nonce", + clientIdentity, + clientSessionName, + clientCorrelationId, + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(5), + openedAt); + } + + private sealed class FakeWorkerClient( + string sessionId, + int? processId, + WorkerClientState state) : IWorkerClient + { + public string SessionId { get; } = sessionId; + + public int? ProcessId { get; } = processId; + + public WorkerClientState State { get; private set; } = state; + + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.Parse("2026-04-26T10:02:00Z"); + + public int StartCount { get; private set; } + + public int ShutdownCount { get; private set; } + + public int KillCount { get; private set; } + + public Task StartAsync(CancellationToken cancellationToken) + { + StartCount++; + return Task.CompletedTask; + } + + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) + { + return Task.FromResult(new WorkerCommandReply()); + } + + public async IAsyncEnumerable ReadEventsAsync( + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync( + TimeSpan timeout, + CancellationToken cancellationToken) + { + ShutdownCount++; + State = WorkerClientState.Closed; + return Task.CompletedTask; + } + + public void Kill(string reason) + { + KillCount++; + State = WorkerClientState.Faulted; + } + + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + } +}