using System; using System.Collections.Generic; using MxGateway.Contracts.Proto; using MxGateway.Worker.MxAccess; namespace MxGateway.Worker.Tests.MxAccess; public sealed class MxAccessEventQueueTests { [Fact] public void Enqueue_AssignsMonotonicWorkerSequencesAndPreservesOrder() { MxAccessEventQueue queue = new(capacity: 4); WorkerEvent first = queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); WorkerEvent second = queue.Enqueue(CreateEvent(MxEventFamily.OnWriteComplete, itemHandle: 11)); Assert.Equal(1UL, first.Event.WorkerSequence); Assert.Equal(2UL, second.Event.WorkerSequence); Assert.NotNull(first.Event.WorkerTimestamp); Assert.Equal(2, queue.Count); Assert.Equal(2UL, queue.LastEventSequence); Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedFirst)); Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedSecond)); Assert.Equal(10, dequeuedFirst?.Event.ItemHandle); Assert.Equal(11, dequeuedSecond?.Event.ItemHandle); Assert.False(queue.TryDequeue(out _)); } [Fact] public void Drain_RemovesAtMostRequestedEvents() { MxAccessEventQueue queue = new(capacity: 4); queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11)); queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12)); IReadOnlyList drained = queue.Drain(maxEvents: 2); Assert.Equal(2, drained.Count); Assert.Equal(10, drained[0].Event.ItemHandle); Assert.Equal(11, drained[1].Event.ItemHandle); Assert.Equal(1, queue.Count); } [Fact] public void Enqueue_WhenCapacityIsExceeded_RecordsOverflowFaultAndRejectsNewEvents() { MxAccessEventQueue queue = new(capacity: 1); queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); MxAccessEventQueueOverflowException overflow = Assert.Throws( () => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11))); Assert.Equal(1, overflow.Capacity); Assert.True(queue.IsFaulted); Assert.Equal(WorkerFaultCategory.QueueOverflow, queue.Fault?.Category); Assert.Equal(ProtocolStatusCode.WorkerUnavailable, queue.Fault?.ProtocolStatus.Code); Assert.Throws( () => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12))); } [Fact] public void RecordFault_KeepsFirstFault() { MxAccessEventQueue queue = new(capacity: 1); queue.RecordFault(new WorkerFault { Category = WorkerFaultCategory.MxaccessEventConversionFailed, }); queue.RecordFault(new WorkerFault { Category = WorkerFaultCategory.QueueOverflow, }); Assert.True(queue.IsFaulted); Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, queue.Fault?.Category); } private static MxEvent CreateEvent( MxEventFamily family, int itemHandle) { MxEvent mxEvent = new() { Family = family, SessionId = "session-1", ServerHandle = 1, ItemHandle = itemHandle, }; switch (family) { case MxEventFamily.OnWriteComplete: mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); break; default: mxEvent.OnDataChange = new OnDataChangeEvent(); break; } return mxEvent; } }