Files
mxaccessgw/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs
T
2026-04-26 19:04:56 -04:00

181 lines
4.1 KiB
C#

using System;
using System.Collections.Generic;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventQueue
{
public const int DefaultCapacity = 10000;
private readonly int capacity;
private readonly Queue<WorkerEvent> events;
private readonly object syncRoot = new();
private ulong lastEventSequence;
private WorkerFault? fault;
public MxAccessEventQueue()
: this(DefaultCapacity)
{
}
public MxAccessEventQueue(int capacity)
{
if (capacity <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(capacity),
"MXAccess event queue capacity must be greater than zero.");
}
this.capacity = capacity;
events = new Queue<WorkerEvent>(capacity);
}
public int Capacity => capacity;
public int Count
{
get
{
lock (syncRoot)
{
return events.Count;
}
}
}
public ulong LastEventSequence
{
get
{
lock (syncRoot)
{
return lastEventSequence;
}
}
}
public bool IsFaulted
{
get
{
lock (syncRoot)
{
return fault is not null;
}
}
}
public WorkerFault? Fault
{
get
{
lock (syncRoot)
{
return fault?.Clone();
}
}
}
public WorkerEvent Enqueue(MxEvent mxEvent)
{
if (mxEvent is null)
{
throw new ArgumentNullException(nameof(mxEvent));
}
lock (syncRoot)
{
if (fault is not null)
{
throw new InvalidOperationException("MXAccess outbound event queue is faulted.");
}
if (events.Count >= capacity)
{
fault = CreateOverflowFault();
throw new MxAccessEventQueueOverflowException(capacity);
}
MxEvent queuedEvent = mxEvent.Clone();
queuedEvent.WorkerSequence = ++lastEventSequence;
queuedEvent.WorkerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow);
WorkerEvent workerEvent = new()
{
Event = queuedEvent,
};
events.Enqueue(workerEvent);
return workerEvent.Clone();
}
}
public bool TryDequeue(out WorkerEvent? workerEvent)
{
lock (syncRoot)
{
if (events.Count == 0)
{
workerEvent = null;
return false;
}
workerEvent = events.Dequeue().Clone();
return true;
}
}
public IReadOnlyList<WorkerEvent> Drain(uint maxEvents)
{
lock (syncRoot)
{
int drainCount = maxEvents == 0
? events.Count
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
if (drainCount == 0)
{
return Array.Empty<WorkerEvent>();
}
List<WorkerEvent> drained = new(drainCount);
for (int index = 0; index < drainCount; index++)
{
drained.Add(events.Dequeue().Clone());
}
return drained;
}
}
public void RecordFault(WorkerFault workerFault)
{
if (workerFault is null)
{
throw new ArgumentNullException(nameof(workerFault));
}
lock (syncRoot)
{
fault ??= workerFault.Clone();
}
}
private WorkerFault CreateOverflowFault()
{
string message = $"MXAccess outbound event queue reached capacity {capacity}.";
return new WorkerFault
{
Category = WorkerFaultCategory.QueueOverflow,
DiagnosticMessage = message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
},
};
}
}