307 lines
9.6 KiB
C#
307 lines
9.6 KiB
C#
using System.Diagnostics.Metrics;
|
|
|
|
namespace MxGateway.Server.Metrics;
|
|
|
|
public sealed class GatewayMetrics : IDisposable
|
|
{
|
|
public const string MeterName = "MxGateway.Server";
|
|
|
|
private readonly object _syncRoot = new();
|
|
private readonly Meter _meter;
|
|
private readonly Counter<long> _sessionsOpenedCounter;
|
|
private readonly Counter<long> _sessionsClosedCounter;
|
|
private readonly Counter<long> _commandsStartedCounter;
|
|
private readonly Counter<long> _commandsSucceededCounter;
|
|
private readonly Counter<long> _commandsFailedCounter;
|
|
private readonly Counter<long> _eventsReceivedCounter;
|
|
private readonly Counter<long> _queueOverflowsCounter;
|
|
private readonly Counter<long> _faultsCounter;
|
|
private readonly Counter<long> _workerKillsCounter;
|
|
private readonly Counter<long> _workerExitsCounter;
|
|
private readonly Counter<long> _heartbeatFailuresCounter;
|
|
private readonly Counter<long> _streamDisconnectsCounter;
|
|
private readonly Histogram<double> _workerStartupLatencyHistogram;
|
|
private readonly Histogram<double> _commandLatencyHistogram;
|
|
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
|
|
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
|
|
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
|
|
|
|
private int _openSessions;
|
|
private int _workersRunning;
|
|
private int _eventQueueDepth;
|
|
private long _sessionsOpened;
|
|
private long _sessionsClosed;
|
|
private long _commandsStarted;
|
|
private long _commandsSucceeded;
|
|
private long _commandsFailed;
|
|
private long _eventsReceived;
|
|
private long _queueOverflows;
|
|
private long _faults;
|
|
private long _workerKills;
|
|
private long _workerExits;
|
|
private long _heartbeatFailures;
|
|
private long _streamDisconnects;
|
|
private bool _disposed;
|
|
|
|
public GatewayMetrics()
|
|
{
|
|
_meter = new Meter(MeterName, typeof(GatewayMetrics).Assembly.GetName().Version?.ToString());
|
|
_sessionsOpenedCounter = _meter.CreateCounter<long>("mxgateway.sessions.opened");
|
|
_sessionsClosedCounter = _meter.CreateCounter<long>("mxgateway.sessions.closed");
|
|
_commandsStartedCounter = _meter.CreateCounter<long>("mxgateway.commands.started");
|
|
_commandsSucceededCounter = _meter.CreateCounter<long>("mxgateway.commands.succeeded");
|
|
_commandsFailedCounter = _meter.CreateCounter<long>("mxgateway.commands.failed");
|
|
_eventsReceivedCounter = _meter.CreateCounter<long>("mxgateway.events.received");
|
|
_queueOverflowsCounter = _meter.CreateCounter<long>("mxgateway.queues.overflows");
|
|
_faultsCounter = _meter.CreateCounter<long>("mxgateway.faults");
|
|
_workerKillsCounter = _meter.CreateCounter<long>("mxgateway.workers.killed");
|
|
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
|
|
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
|
|
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
|
|
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
|
|
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
|
|
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
|
|
|
|
_meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions);
|
|
_meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning);
|
|
_meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth);
|
|
}
|
|
|
|
public void SessionOpened()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_openSessions++;
|
|
_sessionsOpened++;
|
|
}
|
|
|
|
_sessionsOpenedCounter.Add(1);
|
|
}
|
|
|
|
public void SessionClosed()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
if (_openSessions > 0)
|
|
{
|
|
_openSessions--;
|
|
}
|
|
|
|
_sessionsClosed++;
|
|
}
|
|
|
|
_sessionsClosedCounter.Add(1);
|
|
}
|
|
|
|
public void WorkerStarted(TimeSpan startupDuration)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_workersRunning++;
|
|
}
|
|
|
|
_workerStartupLatencyHistogram.Record(startupDuration.TotalMilliseconds);
|
|
}
|
|
|
|
public void WorkerStopped(string reason)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
if (_workersRunning > 0)
|
|
{
|
|
_workersRunning--;
|
|
}
|
|
|
|
_workerExits++;
|
|
}
|
|
|
|
_workerExitsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
|
}
|
|
|
|
public void WorkerKilled(string reason)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_workerKills++;
|
|
}
|
|
|
|
_workerKillsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
|
}
|
|
|
|
public void CommandStarted(string method)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_commandsStarted++;
|
|
}
|
|
|
|
_commandsStartedCounter.Add(1, new KeyValuePair<string, object?>("method", method));
|
|
}
|
|
|
|
public void CommandSucceeded(string method, TimeSpan duration)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_commandsSucceeded++;
|
|
}
|
|
|
|
KeyValuePair<string, object?> methodTag = new("method", method);
|
|
_commandsSucceededCounter.Add(1, methodTag);
|
|
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag);
|
|
}
|
|
|
|
public void CommandFailed(string method, string category, TimeSpan duration)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_commandsFailed++;
|
|
Increment(_commandFailuresByMethod, method);
|
|
}
|
|
|
|
KeyValuePair<string, object?> methodTag = new("method", method);
|
|
KeyValuePair<string, object?> categoryTag = new("category", category);
|
|
_commandsFailedCounter.Add(1, methodTag, categoryTag);
|
|
_commandLatencyHistogram.Record(duration.TotalMilliseconds, methodTag, categoryTag);
|
|
}
|
|
|
|
public void EventReceived(string sessionId, string family)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_eventsReceived++;
|
|
Increment(_eventsByFamily, family);
|
|
}
|
|
|
|
_eventsReceivedCounter.Add(
|
|
1,
|
|
new KeyValuePair<string, object?>("session_id", sessionId),
|
|
new KeyValuePair<string, object?>("family", family));
|
|
}
|
|
|
|
public void RecordEventStreamSend(string family, TimeSpan duration)
|
|
{
|
|
_eventStreamSendLatencyHistogram.Record(
|
|
duration.TotalMilliseconds,
|
|
new KeyValuePair<string, object?>("family", family));
|
|
}
|
|
|
|
public void SetEventQueueDepth(int depth)
|
|
{
|
|
if (depth < 0)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative.");
|
|
}
|
|
|
|
lock (_syncRoot)
|
|
{
|
|
_eventQueueDepth = depth;
|
|
}
|
|
}
|
|
|
|
public void QueueOverflow(string queueName)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_queueOverflows++;
|
|
}
|
|
|
|
_queueOverflowsCounter.Add(1, new KeyValuePair<string, object?>("queue", queueName));
|
|
}
|
|
|
|
public void Fault(string category)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_faults++;
|
|
}
|
|
|
|
_faultsCounter.Add(1, new KeyValuePair<string, object?>("category", category));
|
|
}
|
|
|
|
public void HeartbeatFailed(string sessionId)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_heartbeatFailures++;
|
|
}
|
|
|
|
_heartbeatFailuresCounter.Add(1, new KeyValuePair<string, object?>("session_id", sessionId));
|
|
}
|
|
|
|
public void StreamDisconnected(string reason)
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
_streamDisconnects++;
|
|
}
|
|
|
|
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
|
|
}
|
|
|
|
public GatewayMetricsSnapshot GetSnapshot()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
return new GatewayMetricsSnapshot(
|
|
OpenSessions: _openSessions,
|
|
WorkersRunning: _workersRunning,
|
|
EventQueueDepth: _eventQueueDepth,
|
|
SessionsOpened: _sessionsOpened,
|
|
SessionsClosed: _sessionsClosed,
|
|
CommandsStarted: _commandsStarted,
|
|
CommandsSucceeded: _commandsSucceeded,
|
|
CommandsFailed: _commandsFailed,
|
|
EventsReceived: _eventsReceived,
|
|
QueueOverflows: _queueOverflows,
|
|
Faults: _faults,
|
|
WorkerKills: _workerKills,
|
|
WorkerExits: _workerExits,
|
|
HeartbeatFailures: _heartbeatFailures,
|
|
StreamDisconnects: _streamDisconnects,
|
|
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
|
|
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
_meter.Dispose();
|
|
_disposed = true;
|
|
}
|
|
|
|
private int GetOpenSessions()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
return _openSessions;
|
|
}
|
|
}
|
|
|
|
private int GetWorkersRunning()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
return _workersRunning;
|
|
}
|
|
}
|
|
|
|
private int GetEventQueueDepth()
|
|
{
|
|
lock (_syncRoot)
|
|
{
|
|
return _eventQueueDepth;
|
|
}
|
|
}
|
|
|
|
private static void Increment(Dictionary<string, long> values, string key)
|
|
{
|
|
values.TryGetValue(key, out long currentValue);
|
|
values[key] = currentValue + 1;
|
|
}
|
|
}
|