using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ZB.MOM.WW.Auth.Abstractions.ApiKeys; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Galaxy; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Security.Authentication; using ZB.MOM.WW.MxGateway.Server.Sessions; using ZB.MOM.WW.MxGateway.Server.Workers; namespace ZB.MOM.WW.MxGateway.Server.Dashboard; public sealed class DashboardSnapshotService : IDashboardSnapshotService { private const string HealthyStatus = "Healthy"; private readonly ISessionRegistry _sessionRegistry; private readonly GatewayMetrics _metrics; private readonly IGatewayConfigurationProvider _configurationProvider; private readonly IGalaxyHierarchyCache _galaxyHierarchyCache; private readonly IApiKeyAdminStore _apiKeyAdminStore; private readonly TimeProvider _timeProvider; private readonly DateTimeOffset _gatewayStartedAt; private readonly TimeSpan _snapshotInterval; private readonly TimeSpan _apiKeySummaryRefreshTimeout = TimeSpan.FromSeconds(2); private readonly int _recentFaultLimit; private readonly int _recentSessionLimit; private readonly ILogger _logger; private readonly SemaphoreSlim _apiKeySummaryRefreshGate = new(1, 1); private IReadOnlyList _apiKeySummaries = Array.Empty(); /// Initializes a new instance of the DashboardSnapshotService class. /// Registry of active gateway sessions. /// Gateway metrics collector. /// Gateway configuration provider. /// Galaxy hierarchy cache. /// API key administration store. /// Gateway configuration options. /// Provider for current time; defaults to system time. /// Optional logger for the dashboard snapshot service. public DashboardSnapshotService( ISessionRegistry sessionRegistry, GatewayMetrics metrics, IGatewayConfigurationProvider configurationProvider, IGalaxyHierarchyCache galaxyHierarchyCache, IApiKeyAdminStore apiKeyAdminStore, IOptions options, TimeProvider? timeProvider = null, ILogger? logger = null) { _sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry)); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider)); _galaxyHierarchyCache = galaxyHierarchyCache ?? throw new ArgumentNullException(nameof(galaxyHierarchyCache)); _apiKeyAdminStore = apiKeyAdminStore ?? throw new ArgumentNullException(nameof(apiKeyAdminStore)); ArgumentNullException.ThrowIfNull(options); _timeProvider = timeProvider ?? TimeProvider.System; _gatewayStartedAt = _timeProvider.GetUtcNow(); _snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds); _recentFaultLimit = options.Value.Dashboard.RecentFaultLimit; _recentSessionLimit = options.Value.Dashboard.RecentSessionLimit; _logger = logger ?? NullLogger.Instance; } /// /// Gets a current dashboard snapshot of gateway state. /// /// Dashboard snapshot. public DashboardSnapshot GetSnapshot() { DateTimeOffset generatedAt = _timeProvider.GetUtcNow(); IReadOnlyList sessions = _sessionRegistry.Snapshot() .OrderByDescending(session => session.OpenedAt) .ToArray(); GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot(); IReadOnlyList sessionSummaries = sessions .Take(ResolveLimit(_recentSessionLimit)) .Select(session => CreateSessionSummary(session, metricsSnapshot)) .ToArray(); IReadOnlyList workerSummaries = sessions .Where(session => session.WorkerClient is { State: not WorkerClientState.Closed }) .Select(CreateWorkerSummary) .ToArray(); return new DashboardSnapshot( GeneratedAt: generatedAt, GatewayStartedAt: _gatewayStartedAt, GatewayUptime: generatedAt - _gatewayStartedAt, GatewayStatus: HealthyStatus, GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown", Sessions: sessionSummaries, Workers: workerSummaries, Metrics: CreateMetricSummaries(metricsSnapshot), Faults: CreateFaultSummaries(sessions, generatedAt), ApiKeys: Volatile.Read(ref _apiKeySummaries), Configuration: _configurationProvider.GetEffectiveConfiguration(), Galaxy: DashboardGalaxyProjector.Project(_galaxyHierarchyCache.Current)); } /// /// Watches dashboard snapshots at regular intervals asynchronously. /// /// Cancellation token. /// Async enumerable of dashboard snapshots. public async IAsyncEnumerable WatchSnapshotsAsync( [EnumeratorCancellation] CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { yield break; } await RefreshApiKeySummariesAsync(cancellationToken).ConfigureAwait(false); yield return GetSnapshot(); using PeriodicTimer timer = new(_snapshotInterval, _timeProvider); while (true) { bool hasNext; try { hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { yield break; } if (!hasNext) { yield break; } await RefreshApiKeySummariesAsync(cancellationToken).ConfigureAwait(false); yield return GetSnapshot(); } } private static DashboardSessionSummary CreateSessionSummary( GatewaySession session, GatewayMetricsSnapshot metricsSnapshot) { IWorkerClient? workerClient = session.WorkerClient; metricsSnapshot.EventsBySession.TryGetValue(session.SessionId, out long eventsReceived); return new DashboardSessionSummary( SessionId: session.SessionId, BackendName: session.BackendName, State: session.State, ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity), ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName), ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId), OpenedAt: session.OpenedAt, LastClientActivityAt: session.LastClientActivityAt, LeaseExpiresAt: session.LeaseExpiresAt, WorkerProcessId: workerClient?.ProcessId, WorkerState: workerClient?.State, LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt, EventsReceived: eventsReceived, LastFault: DashboardRedactor.Redact(session.FinalFault)); } private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session) { IWorkerClient workerClient = session.WorkerClient!; return new DashboardWorkerSummary( SessionId: session.SessionId, ProcessId: workerClient.ProcessId, State: workerClient.State, LastHeartbeatAt: workerClient.LastHeartbeatAt, LastFault: DashboardRedactor.Redact(session.FinalFault)); } private static IReadOnlyList CreateMetricSummaries(GatewayMetricsSnapshot snapshot) { List metrics = [ new("mxgateway.sessions.open", snapshot.OpenSessions), new("mxgateway.workers.running", snapshot.WorkersRunning), new("mxgateway.events.worker_queue.depth", snapshot.WorkerEventQueueDepth), new("mxgateway.events.grpc_stream_queue.depth", snapshot.GrpcEventStreamQueueDepth), new("mxgateway.sessions.opened", snapshot.SessionsOpened), new("mxgateway.sessions.closed", snapshot.SessionsClosed), new("mxgateway.commands.started", snapshot.CommandsStarted), new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded), new("mxgateway.commands.failed", snapshot.CommandsFailed), new("mxgateway.events.received", snapshot.EventsReceived), new("mxgateway.queues.overflows", snapshot.QueueOverflows), new("mxgateway.faults", snapshot.Faults), new("mxgateway.workers.killed", snapshot.WorkerKills), new("mxgateway.workers.exited", snapshot.WorkerExits), new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures), new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects), ]; metrics.AddRange(snapshot.CommandFailuresByMethod .OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase) .Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key))); metrics.AddRange(snapshot.EventsByFamily .OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase) .Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key))); return metrics; } private IReadOnlyList CreateFaultSummaries( IReadOnlyList sessions, DateTimeOffset generatedAt) { return sessions .Where(HasFault) .Take(ResolveLimit(_recentFaultLimit)) .Select(session => new DashboardFaultSummary( Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session", SessionId: session.SessionId, WorkerProcessId: session.WorkerProcessId, State: session.WorkerClient?.State == WorkerClientState.Faulted ? WorkerClientState.Faulted.ToString() : session.State.ToString(), Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted", ObservedAt: generatedAt)) .ToArray(); } private async Task RefreshApiKeySummariesAsync(CancellationToken cancellationToken) { if (!await _apiKeySummaryRefreshGate.WaitAsync(0, cancellationToken).ConfigureAwait(false)) { return; } try { using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeout.CancelAfter(_apiKeySummaryRefreshTimeout); IReadOnlyList summaries = (await _apiKeyAdminStore.ListAsync(timeout.Token) .ConfigureAwait(false)) .Select(key => new DashboardApiKeySummary( KeyId: key.KeyId, DisplayName: key.DisplayName, Scopes: key.Scopes, Constraints: ApiKeyConstraintSerializer.Deserialize(key.ConstraintsJson), CreatedUtc: key.CreatedUtc, LastUsedUtc: key.LastUsedUtc, RevokedUtc: key.RevokedUtc)) .ToArray(); Volatile.Write(ref _apiKeySummaries, summaries); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (OperationCanceledException) { _logger.LogWarning( "Timed out refreshing dashboard API key summaries after {Timeout}.", _apiKeySummaryRefreshTimeout); } catch (Exception) { _logger.LogWarning("Failed to refresh dashboard API key summaries."); } finally { _apiKeySummaryRefreshGate.Release(); } } private static bool HasFault(GatewaySession session) { return session.State == ZB.MOM.WW.MxGateway.Contracts.Proto.SessionState.Faulted || session.WorkerClient?.State == WorkerClientState.Faulted || !string.IsNullOrWhiteSpace(session.FinalFault); } private static int ResolveLimit(int configuredLimit) { return configuredLimit < 0 ? 0 : configuredLimit; } }