Files
mxaccessgw/src/MxGateway.Server/Metrics/GatewayMetrics.cs
T

460 lines
16 KiB
C#

using System.Collections.Concurrent;
using System.Diagnostics.Metrics;
namespace MxGateway.Server.Metrics;
public sealed class GatewayMetrics : IDisposable
{
public const string MeterName = "MxGateway.Server";
private readonly object _syncRoot = new();
private readonly Meter _meter;
private readonly Counter<long> _sessionsOpenedCounter;
private readonly Counter<long> _sessionsClosedCounter;
private readonly Counter<long> _commandsStartedCounter;
private readonly Counter<long> _commandsSucceededCounter;
private readonly Counter<long> _commandsFailedCounter;
private readonly Counter<long> _eventsReceivedCounter;
private readonly Counter<long> _queueOverflowsCounter;
private readonly Counter<long> _faultsCounter;
private readonly Counter<long> _workerKillsCounter;
private readonly Counter<long> _workerExitsCounter;
private readonly Counter<long> _heartbeatFailuresCounter;
private readonly Counter<long> _streamDisconnectsCounter;
private readonly Counter<long> _retryAttemptsCounter;
private readonly Histogram<double> _workerStartupLatencyHistogram;
private readonly Histogram<double> _commandLatencyHistogram;
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<string, long> _eventsBySession = new(StringComparer.Ordinal);
private readonly Dictionary<string, long> _retryAttemptsByArea = new(StringComparer.OrdinalIgnoreCase);
private int _openSessions;
private int _workersRunning;
private int _workerEventQueueDepth;
private int _grpcEventStreamQueueDepth;
private long _sessionsOpened;
private long _sessionsClosed;
private long _commandsStarted;
private long _commandsSucceeded;
private long _commandsFailed;
private long _eventsReceived;
private long _queueOverflows;
private long _faults;
private long _workerKills;
private long _workerExits;
private long _heartbeatFailures;
private long _streamDisconnects;
private long _retryAttempts;
private bool _disposed;
/// <summary>
/// Initializes the gateway metrics with OpenTelemetry counters and histograms.
/// </summary>
public GatewayMetrics()
{
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
_retryAttemptsCounter = _meter.CreateCounter<long>("mxgateway.retries.attempted");
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
_meter.CreateObservableGauge("mxgateway.events.worker_queue.depth", GetWorkerEventQueueDepth);
_meter.CreateObservableGauge("mxgateway.events.grpc_stream_queue.depth", GetGrpcEventStreamQueueDepth);
}
/// <summary>
/// Records that a session has been opened.
/// </summary>
public void SessionOpened()
{
lock (_syncRoot)
{
_openSessions++;
_sessionsOpened++;
}
_sessionsOpenedCounter.Add(1);
}
/// <summary>
/// Records that a session has been closed.
/// </summary>
public void SessionClosed()
{
lock (_syncRoot)
{
if (_openSessions > 0)
{
_openSessions--;
}
_sessionsClosed++;
}
_sessionsClosedCounter.Add(1);
}
/// <summary>
/// Records that a session has been removed from registry.
/// </summary>
public void SessionRemoved()
{
lock (_syncRoot)
{
if (_openSessions > 0)
{
_openSessions--;
}
}
}
/// <summary>
/// Records that a worker process has started and its startup latency.
/// </summary>
/// <param name="startupDuration">Duration elapsed while starting the worker.</param>
public void WorkerStarted(TimeSpan startupDuration)
{
lock (_syncRoot)
{
_workersRunning++;
}
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
}
/// <summary>
/// Records that a worker process has stopped with the given reason.
/// </summary>
/// <param name="reason">Cause of the worker stopping.</param>
public void WorkerStopped(string reason)
{
lock (_syncRoot)
{
if (_workersRunning > 0)
{
_workersRunning--;
}
_workerExits++;
}
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
/// <summary>
/// Records that a worker process was killed with the given reason.
/// </summary>
/// <param name="reason">Cause of the worker termination.</param>
public void WorkerKilled(string reason)
{
lock (_syncRoot)
{
_workerKills++;
}
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
/// <summary>
/// Records that a command has started for the given method.
/// </summary>
/// <param name="method">Name of the command method.</param>
public void CommandStarted(string method)
{
lock (_syncRoot)
{
_commandsStarted++;
}
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
}
/// <summary>
/// Records that a command succeeded for the given method and duration.
/// </summary>
/// <param name="method">Name of the command method.</param>
/// <param name="duration">Elapsed time to complete the command.</param>
public void CommandSucceeded(string method, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsSucceeded++;
}
KeyValuePair<string, object?> methodTag = new("method", method);
_commandsSucceededCounter.Add(1, methodTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
}
/// <summary>
/// Records that a command failed for the given method, category, and duration.
/// </summary>
/// <param name="method">Name of the command method.</param>
/// <param name="category">Classification of the failure.</param>
/// <param name="duration">Elapsed time before command failed.</param>
public void CommandFailed(string method, string category, TimeSpan duration)
{
lock (_syncRoot)
{
_commandsFailed++;
Increment(_commandFailuresByMethod, method);
}
KeyValuePair<string, object?> methodTag = new("method", method);
KeyValuePair<string, object?> categoryTag = new("category", category);
_commandsFailedCounter.Add(1, methodTag, categoryTag);
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
}
/// <summary>
/// Records that an event was received for the given session and family.
/// </summary>
/// <param name="sessionId">Identifier of the session receiving the event.</param>
/// <param name="family">Event family classification.</param>
public void EventReceived(string sessionId, string family)
{
Interlocked.Increment(ref _eventsReceived);
Increment(_eventsByFamily, family);
Increment(_eventsBySession, sessionId);
_eventsReceivedCounter.Add(
1,
new KeyValuePair<string, object?>("family", family));
}
/// <summary>
/// Records the latency of sending an event to a client stream.
/// </summary>
/// <param name="family">Event family name.</param>
/// <param name="duration">Time taken to send the event.</param>
public void RecordEventStreamSend(string family, TimeSpan duration)
{
_eventStreamSendLatencyHistogram.Record(
duration.TotalMilliseconds,
new KeyValuePair<string, object?>("family", family));
}
/// <summary>
/// Sets the worker event queue depth; delegates to SetWorkerEventQueueDepth.
/// </summary>
/// <param name="depth">Queue depth value.</param>
public void SetEventQueueDepth(int depth)
{
SetWorkerEventQueueDepth(depth);
}
/// <summary>
/// Sets the worker event queue depth to the given value.
/// </summary>
/// <param name="depth">Queue depth value.</param>
public void SetWorkerEventQueueDepth(int depth)
{
if (depth < 0)
{
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
}
lock (_syncRoot)
{
_workerEventQueueDepth = depth;
}
}
/// <summary>
/// Adjusts the gRPC event stream queue depth by the given delta.
/// </summary>
/// <param name="delta">Amount to adjust the queue depth by.</param>
public void AdjustGrpcEventStreamQueueDepth(int delta)
{
lock (_syncRoot)
{
_grpcEventStreamQueueDepth = Math.Max(0, _grpcEventStreamQueueDepth + delta);
}
}
/// <summary>
/// Removes event counters for the given session.
/// </summary>
/// <param name="sessionId">Identifier of the session.</param>
public void RemoveSessionEvents(string sessionId)
{
_eventsBySession.TryRemove(sessionId, out _);
}
/// <summary>
/// Records that a queue overflow occurred for the given queue name.
/// </summary>
/// <param name="queueName">Name of the queue that overflowed.</param>
public void QueueOverflow(string queueName)
{
lock (_syncRoot)
{
_queueOverflows++;
}
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
}
/// <summary>
/// Records that a fault occurred in the given category.
/// </summary>
/// <param name="category">Category of the fault.</param>
public void Fault(string category)
{
lock (_syncRoot)
{
_faults++;
}
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
}
/// <summary>
/// Records that a heartbeat failed for the given session.
/// </summary>
/// <param name="sessionId">Identifier of the session.</param>
public void HeartbeatFailed(string sessionId)
{
lock (_syncRoot)
{
_heartbeatFailures++;
}
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
}
/// <summary>
/// Records that an event stream was disconnected with the given reason.
/// </summary>
/// <param name="reason">Reason for the disconnection.</param>
public void StreamDisconnected(string reason)
{
lock (_syncRoot)
{
_streamDisconnects++;
}
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
/// <summary>
/// Records that a retry was attempted in the given area.
/// </summary>
/// <param name="area">Area in which the retry was attempted.</param>
public void RetryAttempted(string area)
{
lock (_syncRoot)
{
_retryAttempts++;
Increment(_retryAttemptsByArea, area);
}
_retryAttemptsCounter.Add(1, new KeyValuePair<string, object?>("area", area));
}
/// <summary>
/// Returns a snapshot of all current metric values.
/// </summary>
public GatewayMetricsSnapshot GetSnapshot()
{
lock (_syncRoot)
{
return new GatewayMetricsSnapshot(
OpenSessions: _openSessions,
WorkersRunning: _workersRunning,
WorkerEventQueueDepth: _workerEventQueueDepth,
GrpcEventStreamQueueDepth: _grpcEventStreamQueueDepth,
SessionsOpened: _sessionsOpened,
SessionsClosed: _sessionsClosed,
CommandsStarted: _commandsStarted,
CommandsSucceeded: _commandsSucceeded,
CommandsFailed: _commandsFailed,
EventsReceived: Interlocked.Read(ref _eventsReceived),
QueueOverflows: _queueOverflows,
Faults: _faults,
WorkerKills: _workerKills,
WorkerExits: _workerExits,
HeartbeatFailures: _heartbeatFailures,
StreamDisconnects: _streamDisconnects,
RetryAttempts: _retryAttempts,
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase),
EventsBySession: new Dictionary<string, long>(_eventsBySession, StringComparer.Ordinal),
RetryAttemptsByArea: new Dictionary<string, long>(_retryAttemptsByArea, StringComparer.OrdinalIgnoreCase));
}
}
/// <summary>
/// Disposes the underlying OpenTelemetry meter.
/// </summary>
public void Dispose()
{
if (_disposed)
{
return;
}
_meter.Dispose();
_disposed = true;
}
private int GetOpenSessions()
{
lock (_syncRoot)
{
return _openSessions;
}
}
private int GetWorkersRunning()
{
lock (_syncRoot)
{
return _workersRunning;
}
}
private int GetWorkerEventQueueDepth()
{
lock (_syncRoot)
{
return _workerEventQueueDepth;
}
}
private int GetGrpcEventStreamQueueDepth()
{
lock (_syncRoot)
{
return _grpcEventStreamQueueDepth;
}
}
private static void Increment(Dictionary<string, long> values, string key)
{
values.TryGetValue(key, out long currentValue);
values[key] = currentValue + 1;
}
private static void Increment(ConcurrentDictionary<string, long> values, string key)
{
values.AddOrUpdate(key, 1, static (_, currentValue) => currentValue + 1);
}
}