Issue #15: implement dashboard snapshot service #73
@@ -107,6 +107,8 @@ worker, correlation, command, and client identity fields with redaction applied
|
||||
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
|
||||
histograms through .NET `Meter` and a snapshot API that dashboard services can
|
||||
project without binding to a metrics exporter.
|
||||
`DashboardSnapshotService` projects sessions, workers, metrics, faults, and
|
||||
effective configuration into immutable DTOs for read-only dashboard rendering.
|
||||
|
||||
### Worker Process
|
||||
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed record DashboardFaultSummary(
|
||||
string Source,
|
||||
string? SessionId,
|
||||
int? WorkerProcessId,
|
||||
string State,
|
||||
string Message,
|
||||
DateTimeOffset ObservedAt);
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed record DashboardMetricSummary(
|
||||
string Name,
|
||||
long Value,
|
||||
string? Dimension = null);
|
||||
@@ -0,0 +1,34 @@
|
||||
using MxGateway.Server.Diagnostics;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
internal static class DashboardRedactor
|
||||
{
|
||||
private static readonly string[] SensitiveTextMarkers =
|
||||
[
|
||||
"apikey",
|
||||
"api_key",
|
||||
"authorization",
|
||||
"credential",
|
||||
"password",
|
||||
"secret",
|
||||
"token",
|
||||
];
|
||||
|
||||
public static string? Redact(string? value)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return GatewayLogRedactor.RedactClientIdentity(value);
|
||||
}
|
||||
|
||||
return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase))
|
||||
? GatewayLogRedactor.RedactedValue
|
||||
: value;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public static class DashboardServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddGatewayDashboard(this IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<IDashboardSnapshotService, DashboardSnapshotService>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed record DashboardSessionSummary(
|
||||
string SessionId,
|
||||
string BackendName,
|
||||
SessionState State,
|
||||
string? ClientIdentity,
|
||||
string? ClientSessionName,
|
||||
string? ClientCorrelationId,
|
||||
DateTimeOffset OpenedAt,
|
||||
DateTimeOffset LastClientActivityAt,
|
||||
DateTimeOffset? LeaseExpiresAt,
|
||||
int? WorkerProcessId,
|
||||
WorkerClientState? WorkerState,
|
||||
DateTimeOffset? LastWorkerHeartbeatAt,
|
||||
string? LastFault);
|
||||
@@ -0,0 +1,15 @@
|
||||
using MxGateway.Server.Configuration;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed record DashboardSnapshot(
|
||||
DateTimeOffset GeneratedAt,
|
||||
DateTimeOffset GatewayStartedAt,
|
||||
TimeSpan GatewayUptime,
|
||||
string GatewayStatus,
|
||||
string GatewayVersion,
|
||||
IReadOnlyList<DashboardSessionSummary> Sessions,
|
||||
IReadOnlyList<DashboardWorkerSummary> Workers,
|
||||
IReadOnlyList<DashboardMetricSummary> Metrics,
|
||||
IReadOnlyList<DashboardFaultSummary> Faults,
|
||||
EffectiveGatewayConfiguration Configuration);
|
||||
@@ -0,0 +1,196 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed class DashboardSnapshotService : IDashboardSnapshotService
|
||||
{
|
||||
private const string HealthyStatus = "Healthy";
|
||||
|
||||
private readonly ISessionRegistry _sessionRegistry;
|
||||
private readonly GatewayMetrics _metrics;
|
||||
private readonly IGatewayConfigurationProvider _configurationProvider;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly DateTimeOffset _gatewayStartedAt;
|
||||
private readonly TimeSpan _snapshotInterval;
|
||||
private readonly int _recentFaultLimit;
|
||||
private readonly int _recentSessionLimit;
|
||||
|
||||
public DashboardSnapshotService(
|
||||
ISessionRegistry sessionRegistry,
|
||||
GatewayMetrics metrics,
|
||||
IGatewayConfigurationProvider configurationProvider,
|
||||
IOptions<GatewayOptions> options,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
|
||||
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_gatewayStartedAt = _timeProvider.GetUtcNow();
|
||||
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
|
||||
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
|
||||
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
|
||||
}
|
||||
|
||||
public DashboardSnapshot GetSnapshot()
|
||||
{
|
||||
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
|
||||
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
|
||||
.OrderByDescending(session => session.OpenedAt)
|
||||
.ToArray();
|
||||
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
|
||||
.Take(ResolveLimit(_recentSessionLimit))
|
||||
.Select(CreateSessionSummary)
|
||||
.ToArray();
|
||||
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
|
||||
.Where(session => session.WorkerClient is not null)
|
||||
.Select(CreateWorkerSummary)
|
||||
.ToArray();
|
||||
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
|
||||
|
||||
return new DashboardSnapshot(
|
||||
GeneratedAt: generatedAt,
|
||||
GatewayStartedAt: _gatewayStartedAt,
|
||||
GatewayUptime: generatedAt - _gatewayStartedAt,
|
||||
GatewayStatus: HealthyStatus,
|
||||
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
|
||||
Sessions: sessionSummaries,
|
||||
Workers: workerSummaries,
|
||||
Metrics: CreateMetricSummaries(metricsSnapshot),
|
||||
Faults: CreateFaultSummaries(sessions, generatedAt),
|
||||
Configuration: _configurationProvider.GetEffectiveConfiguration());
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
yield return GetSnapshot();
|
||||
|
||||
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
|
||||
while (true)
|
||||
{
|
||||
bool hasNext;
|
||||
try
|
||||
{
|
||||
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
if (!hasNext)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
yield return GetSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
|
||||
{
|
||||
IWorkerClient? workerClient = session.WorkerClient;
|
||||
|
||||
return new DashboardSessionSummary(
|
||||
SessionId: session.SessionId,
|
||||
BackendName: session.BackendName,
|
||||
State: session.State,
|
||||
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
|
||||
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
|
||||
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
|
||||
OpenedAt: session.OpenedAt,
|
||||
LastClientActivityAt: session.LastClientActivityAt,
|
||||
LeaseExpiresAt: session.LeaseExpiresAt,
|
||||
WorkerProcessId: workerClient?.ProcessId,
|
||||
WorkerState: workerClient?.State,
|
||||
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
|
||||
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
||||
}
|
||||
|
||||
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
|
||||
{
|
||||
IWorkerClient workerClient = session.WorkerClient!;
|
||||
|
||||
return new DashboardWorkerSummary(
|
||||
SessionId: session.SessionId,
|
||||
ProcessId: workerClient.ProcessId,
|
||||
State: workerClient.State,
|
||||
LastHeartbeatAt: workerClient.LastHeartbeatAt,
|
||||
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
||||
}
|
||||
|
||||
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
|
||||
{
|
||||
List<DashboardMetricSummary> metrics =
|
||||
[
|
||||
new("mxgateway.sessions.open", snapshot.OpenSessions),
|
||||
new("mxgateway.workers.running", snapshot.WorkersRunning),
|
||||
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
|
||||
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
|
||||
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
|
||||
new("mxgateway.commands.started", snapshot.CommandsStarted),
|
||||
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
|
||||
new("mxgateway.commands.failed", snapshot.CommandsFailed),
|
||||
new("mxgateway.events.received", snapshot.EventsReceived),
|
||||
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
|
||||
new("mxgateway.faults", snapshot.Faults),
|
||||
new("mxgateway.workers.killed", snapshot.WorkerKills),
|
||||
new("mxgateway.workers.exited", snapshot.WorkerExits),
|
||||
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
|
||||
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
|
||||
];
|
||||
|
||||
metrics.AddRange(snapshot.CommandFailuresByMethod
|
||||
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
||||
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
|
||||
metrics.AddRange(snapshot.EventsByFamily
|
||||
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
||||
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
|
||||
IReadOnlyList<GatewaySession> sessions,
|
||||
DateTimeOffset generatedAt)
|
||||
{
|
||||
return sessions
|
||||
.Where(HasFault)
|
||||
.Take(ResolveLimit(_recentFaultLimit))
|
||||
.Select(session => new DashboardFaultSummary(
|
||||
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
|
||||
SessionId: session.SessionId,
|
||||
WorkerProcessId: session.WorkerProcessId,
|
||||
State: session.WorkerClient?.State == WorkerClientState.Faulted
|
||||
? WorkerClientState.Faulted.ToString()
|
||||
: session.State.ToString(),
|
||||
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
|
||||
ObservedAt: generatedAt))
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
private static bool HasFault(GatewaySession session)
|
||||
{
|
||||
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|
||||
|| session.WorkerClient?.State == WorkerClientState.Faulted
|
||||
|| !string.IsNullOrWhiteSpace(session.FinalFault);
|
||||
}
|
||||
|
||||
private static int ResolveLimit(int configuredLimit)
|
||||
{
|
||||
return configuredLimit < 0 ? 0 : configuredLimit;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public sealed record DashboardWorkerSummary(
|
||||
string SessionId,
|
||||
int? ProcessId,
|
||||
WorkerClientState State,
|
||||
DateTimeOffset LastHeartbeatAt,
|
||||
string? LastFault);
|
||||
@@ -0,0 +1,8 @@
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
|
||||
public interface IDashboardSnapshotService
|
||||
{
|
||||
DashboardSnapshot GetSnapshot();
|
||||
|
||||
IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Dashboard;
|
||||
using MxGateway.Server.Diagnostics;
|
||||
using MxGateway.Server.Grpc;
|
||||
using MxGateway.Server.Metrics;
|
||||
@@ -36,6 +37,7 @@ public static class GatewayApplication
|
||||
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
|
||||
builder.Services.AddWorkerProcessLauncher();
|
||||
builder.Services.AddGatewaySessions();
|
||||
builder.Services.AddGatewayDashboard();
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,290 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Dashboard;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Dashboard;
|
||||
|
||||
public sealed class DashboardSnapshotServiceTests
|
||||
{
|
||||
[Fact]
|
||||
public void GetSnapshot_WhenRegistryEmpty_ReturnsEmptyOperationalState()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
DashboardSnapshotService service = CreateService(new SessionRegistry(), metrics);
|
||||
|
||||
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||
|
||||
Assert.Empty(snapshot.Sessions);
|
||||
Assert.Empty(snapshot.Workers);
|
||||
Assert.Empty(snapshot.Faults);
|
||||
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.sessions.open" && metric.Value == 0);
|
||||
Assert.Equal("Healthy", snapshot.GatewayStatus);
|
||||
Assert.NotNull(snapshot.Configuration);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetSnapshot_ProjectsActiveAndFaultedSessionsWorkersMetricsAndFaults()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
GatewaySession activeSession = CreateSession(
|
||||
"session-active",
|
||||
"client-one",
|
||||
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||
activeSession.AttachWorkerClient(new FakeWorkerClient("session-active", 1201, WorkerClientState.Ready));
|
||||
activeSession.MarkReady();
|
||||
GatewaySession faultedSession = CreateSession(
|
||||
"session-faulted",
|
||||
"client-two",
|
||||
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
|
||||
faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted));
|
||||
faultedSession.MarkFaulted("worker pipe disconnected");
|
||||
registry.TryAdd(activeSession);
|
||||
registry.TryAdd(faultedSession);
|
||||
using GatewayMetrics metrics = new();
|
||||
metrics.SessionOpened();
|
||||
metrics.SessionOpened();
|
||||
metrics.CommandStarted("Register");
|
||||
metrics.CommandFailed("Register", "WorkerFaulted", TimeSpan.FromMilliseconds(7));
|
||||
metrics.EventReceived("session-active", "OnDataChange");
|
||||
metrics.Fault("WorkerFaulted");
|
||||
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||
|
||||
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||
|
||||
Assert.Equal(2, snapshot.Sessions.Count);
|
||||
Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId);
|
||||
Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State);
|
||||
Assert.Equal(2, snapshot.Workers.Count);
|
||||
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1);
|
||||
Assert.Contains(
|
||||
snapshot.Metrics,
|
||||
metric => metric.Name == "mxgateway.events.received"
|
||||
&& metric.Dimension == "OnDataChange"
|
||||
&& metric.Value == 1);
|
||||
DashboardFaultSummary fault = Assert.Single(snapshot.Faults);
|
||||
Assert.Equal("Worker", fault.Source);
|
||||
Assert.Equal("session-faulted", fault.SessionId);
|
||||
Assert.Equal("worker pipe disconnected", fault.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetSnapshot_RedactsSecretsFromSessionAndFaultFields()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
GatewaySession session = CreateSession(
|
||||
"session-redacted",
|
||||
"Bearer mxgw_admin_super-secret",
|
||||
DateTimeOffset.Parse("2026-04-26T10:00:00Z"),
|
||||
clientSessionName: "password=hunter2",
|
||||
clientCorrelationId: "token=abc123");
|
||||
session.MarkFaulted("secret=credential-value");
|
||||
registry.TryAdd(session);
|
||||
using GatewayMetrics metrics = new();
|
||||
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||
|
||||
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||
|
||||
DashboardSessionSummary summary = Assert.Single(snapshot.Sessions);
|
||||
Assert.Equal("Bearer mxgw_admin_[redacted]", summary.ClientIdentity);
|
||||
Assert.Equal("[redacted]", summary.ClientSessionName);
|
||||
Assert.Equal("[redacted]", summary.ClientCorrelationId);
|
||||
Assert.Equal("[redacted]", summary.LastFault);
|
||||
Assert.Equal("[redacted]", Assert.Single(snapshot.Faults).Message);
|
||||
Assert.Equal("[redacted]", snapshot.Configuration.Authentication.PepperSecretName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetSnapshot_DoesNotMutateSessionOrWorkerState()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
GatewaySession session = CreateSession(
|
||||
"session-active",
|
||||
"client-one",
|
||||
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||
FakeWorkerClient workerClient = new("session-active", 1201, WorkerClientState.Ready);
|
||||
session.AttachWorkerClient(workerClient);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
using GatewayMetrics metrics = new();
|
||||
DashboardSnapshotService service = CreateService(registry, metrics);
|
||||
|
||||
service.GetSnapshot();
|
||||
service.GetSnapshot();
|
||||
|
||||
Assert.Equal(1, registry.ActiveCount);
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
Assert.Equal(WorkerClientState.Ready, workerClient.State);
|
||||
Assert.Equal(0, workerClient.StartCount);
|
||||
Assert.Equal(0, workerClient.ShutdownCount);
|
||||
Assert.Equal(0, workerClient.KillCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetSnapshot_AppliesRecentSessionAndFaultLimits()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
GatewaySession olderSession = CreateSession(
|
||||
"session-older",
|
||||
"client-one",
|
||||
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
|
||||
GatewaySession newerSession = CreateSession(
|
||||
"session-newer",
|
||||
"client-two",
|
||||
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
|
||||
olderSession.MarkFaulted("older fault");
|
||||
newerSession.MarkFaulted("newer fault");
|
||||
registry.TryAdd(olderSession);
|
||||
registry.TryAdd(newerSession);
|
||||
using GatewayMetrics metrics = new();
|
||||
DashboardSnapshotService service = CreateService(
|
||||
registry,
|
||||
metrics,
|
||||
new GatewayOptions
|
||||
{
|
||||
Dashboard = new DashboardOptions
|
||||
{
|
||||
SnapshotIntervalMilliseconds = 1,
|
||||
RecentSessionLimit = 1,
|
||||
RecentFaultLimit = 1,
|
||||
},
|
||||
});
|
||||
|
||||
DashboardSnapshot snapshot = service.GetSnapshot();
|
||||
|
||||
Assert.Equal("session-newer", Assert.Single(snapshot.Sessions).SessionId);
|
||||
Assert.Equal("session-newer", Assert.Single(snapshot.Faults).SessionId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WatchSnapshotsAsync_WhenSubscriberCancels_DisposesCleanly()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
DashboardSnapshotService service = CreateService(
|
||||
new SessionRegistry(),
|
||||
metrics,
|
||||
new GatewayOptions
|
||||
{
|
||||
Dashboard = new DashboardOptions
|
||||
{
|
||||
SnapshotIntervalMilliseconds = 1,
|
||||
},
|
||||
});
|
||||
using CancellationTokenSource cancellation = new();
|
||||
await using IAsyncEnumerator<DashboardSnapshot> enumerator = service
|
||||
.WatchSnapshotsAsync(cancellation.Token)
|
||||
.GetAsyncEnumerator();
|
||||
|
||||
Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1)));
|
||||
await cancellation.CancelAsync();
|
||||
bool hasNext = await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1));
|
||||
|
||||
Assert.False(hasNext);
|
||||
}
|
||||
|
||||
private static DashboardSnapshotService CreateService(
|
||||
SessionRegistry registry,
|
||||
GatewayMetrics metrics,
|
||||
GatewayOptions? options = null)
|
||||
{
|
||||
GatewayOptions resolvedOptions = options ?? new GatewayOptions
|
||||
{
|
||||
Dashboard = new DashboardOptions
|
||||
{
|
||||
SnapshotIntervalMilliseconds = 1,
|
||||
},
|
||||
};
|
||||
GatewayConfigurationProvider configurationProvider = new(Options.Create(resolvedOptions));
|
||||
|
||||
return new DashboardSnapshotService(
|
||||
registry,
|
||||
metrics,
|
||||
configurationProvider,
|
||||
Options.Create(resolvedOptions));
|
||||
}
|
||||
|
||||
private static GatewaySession CreateSession(
|
||||
string sessionId,
|
||||
string? clientIdentity,
|
||||
DateTimeOffset openedAt,
|
||||
string? clientSessionName = "test-session",
|
||||
string? clientCorrelationId = "client-correlation")
|
||||
{
|
||||
return new GatewaySession(
|
||||
sessionId,
|
||||
"mxaccess",
|
||||
$"mxaccess-gateway-1-{sessionId}",
|
||||
"nonce",
|
||||
clientIdentity,
|
||||
clientSessionName,
|
||||
clientCorrelationId,
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(5),
|
||||
TimeSpan.FromSeconds(5),
|
||||
openedAt);
|
||||
}
|
||||
|
||||
private sealed class FakeWorkerClient(
|
||||
string sessionId,
|
||||
int? processId,
|
||||
WorkerClientState state) : IWorkerClient
|
||||
{
|
||||
public string SessionId { get; } = sessionId;
|
||||
|
||||
public int? ProcessId { get; } = processId;
|
||||
|
||||
public WorkerClientState State { get; private set; } = state;
|
||||
|
||||
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.Parse("2026-04-26T10:02:00Z");
|
||||
|
||||
public int StartCount { get; private set; }
|
||||
|
||||
public int ShutdownCount { get; private set; }
|
||||
|
||||
public int KillCount { get; private set; }
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
StartCount++;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
WorkerCommand command,
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new WorkerCommandReply());
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ShutdownCount++;
|
||||
State = WorkerClientState.Closed;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void Kill(string reason)
|
||||
{
|
||||
KillCount++;
|
||||
State = WorkerClientState.Faulted;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user