197 lines
8.3 KiB
C#
197 lines
8.3 KiB
C#
using System.Runtime.CompilerServices;
|
|
using Microsoft.Extensions.Options;
|
|
using MxGateway.Server.Configuration;
|
|
using MxGateway.Server.Metrics;
|
|
using MxGateway.Server.Sessions;
|
|
using MxGateway.Server.Workers;
|
|
|
|
namespace MxGateway.Server.Dashboard;
|
|
|
|
public sealed class DashboardSnapshotService : IDashboardSnapshotService
|
|
{
|
|
private const string HealthyStatus = "Healthy";
|
|
|
|
private readonly ISessionRegistry _sessionRegistry;
|
|
private readonly GatewayMetrics _metrics;
|
|
private readonly IGatewayConfigurationProvider _configurationProvider;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly DateTimeOffset _gatewayStartedAt;
|
|
private readonly TimeSpan _snapshotInterval;
|
|
private readonly int _recentFaultLimit;
|
|
private readonly int _recentSessionLimit;
|
|
|
|
public DashboardSnapshotService(
|
|
ISessionRegistry sessionRegistry,
|
|
GatewayMetrics metrics,
|
|
IGatewayConfigurationProvider configurationProvider,
|
|
IOptions<GatewayOptions> options,
|
|
TimeProvider? timeProvider = null)
|
|
{
|
|
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
|
|
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
|
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
|
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
_gatewayStartedAt = _timeProvider.GetUtcNow();
|
|
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
|
|
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
|
|
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
|
|
}
|
|
|
|
public DashboardSnapshot GetSnapshot()
|
|
{
|
|
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
|
|
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
|
|
.OrderByDescending(session => session.OpenedAt)
|
|
.ToArray();
|
|
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
|
|
.Take(ResolveLimit(_recentSessionLimit))
|
|
.Select(CreateSessionSummary)
|
|
.ToArray();
|
|
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
|
|
.Where(session => session.WorkerClient is not null)
|
|
.Select(CreateWorkerSummary)
|
|
.ToArray();
|
|
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
|
|
|
|
return new DashboardSnapshot(
|
|
GeneratedAt: generatedAt,
|
|
GatewayStartedAt: _gatewayStartedAt,
|
|
GatewayUptime: generatedAt - _gatewayStartedAt,
|
|
GatewayStatus: HealthyStatus,
|
|
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
|
|
Sessions: sessionSummaries,
|
|
Workers: workerSummaries,
|
|
Metrics: CreateMetricSummaries(metricsSnapshot),
|
|
Faults: CreateFaultSummaries(sessions, generatedAt),
|
|
Configuration: _configurationProvider.GetEffectiveConfiguration());
|
|
}
|
|
|
|
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
|
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
yield break;
|
|
}
|
|
|
|
yield return GetSnapshot();
|
|
|
|
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
|
|
while (true)
|
|
{
|
|
bool hasNext;
|
|
try
|
|
{
|
|
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
yield break;
|
|
}
|
|
|
|
if (!hasNext)
|
|
{
|
|
yield break;
|
|
}
|
|
|
|
yield return GetSnapshot();
|
|
}
|
|
}
|
|
|
|
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
|
|
{
|
|
IWorkerClient? workerClient = session.WorkerClient;
|
|
|
|
return new DashboardSessionSummary(
|
|
SessionId: session.SessionId,
|
|
BackendName: session.BackendName,
|
|
State: session.State,
|
|
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
|
|
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
|
|
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
|
|
OpenedAt: session.OpenedAt,
|
|
LastClientActivityAt: session.LastClientActivityAt,
|
|
LeaseExpiresAt: session.LeaseExpiresAt,
|
|
WorkerProcessId: workerClient?.ProcessId,
|
|
WorkerState: workerClient?.State,
|
|
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
|
|
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
|
}
|
|
|
|
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
|
|
{
|
|
IWorkerClient workerClient = session.WorkerClient!;
|
|
|
|
return new DashboardWorkerSummary(
|
|
SessionId: session.SessionId,
|
|
ProcessId: workerClient.ProcessId,
|
|
State: workerClient.State,
|
|
LastHeartbeatAt: workerClient.LastHeartbeatAt,
|
|
LastFault: DashboardRedactor.Redact(session.FinalFault));
|
|
}
|
|
|
|
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
|
|
{
|
|
List<DashboardMetricSummary> metrics =
|
|
[
|
|
new("mxgateway.sessions.open", snapshot.OpenSessions),
|
|
new("mxgateway.workers.running", snapshot.WorkersRunning),
|
|
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
|
|
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
|
|
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
|
|
new("mxgateway.commands.started", snapshot.CommandsStarted),
|
|
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
|
|
new("mxgateway.commands.failed", snapshot.CommandsFailed),
|
|
new("mxgateway.events.received", snapshot.EventsReceived),
|
|
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
|
|
new("mxgateway.faults", snapshot.Faults),
|
|
new("mxgateway.workers.killed", snapshot.WorkerKills),
|
|
new("mxgateway.workers.exited", snapshot.WorkerExits),
|
|
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
|
|
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
|
|
];
|
|
|
|
metrics.AddRange(snapshot.CommandFailuresByMethod
|
|
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
|
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
|
|
metrics.AddRange(snapshot.EventsByFamily
|
|
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
|
|
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
|
|
|
|
return metrics;
|
|
}
|
|
|
|
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
|
|
IReadOnlyList<GatewaySession> sessions,
|
|
DateTimeOffset generatedAt)
|
|
{
|
|
return sessions
|
|
.Where(HasFault)
|
|
.Take(ResolveLimit(_recentFaultLimit))
|
|
.Select(session => new DashboardFaultSummary(
|
|
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
|
|
SessionId: session.SessionId,
|
|
WorkerProcessId: session.WorkerProcessId,
|
|
State: session.WorkerClient?.State == WorkerClientState.Faulted
|
|
? WorkerClientState.Faulted.ToString()
|
|
: session.State.ToString(),
|
|
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
|
|
ObservedAt: generatedAt))
|
|
.ToArray();
|
|
}
|
|
|
|
private static bool HasFault(GatewaySession session)
|
|
{
|
|
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|
|
|| session.WorkerClient?.State == WorkerClientState.Faulted
|
|
|| !string.IsNullOrWhiteSpace(session.FinalFault);
|
|
}
|
|
|
|
private static int ResolveLimit(int configuredLimit)
|
|
{
|
|
return configuredLimit < 0 ? 0 : configuredLimit;
|
|
}
|
|
}
|