Implement worker MXAccess event queue

This commit is contained in:
Joseph Doherty
2026-04-26 19:04:56 -04:00
parent 366f57198f
commit dd455089b4
14 changed files with 806 additions and 17 deletions
@@ -842,7 +842,9 @@ public sealed class MxAccessCommandExecutorTests
private sealed class NoopEventSink : IMxAccessEventSink
{
public void Attach(object mxAccessComObject)
public void Attach(
object mxAccessComObject,
string sessionId)
{
}
@@ -0,0 +1,117 @@
using System;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessEventMapperTests
{
private readonly MxAccessEventMapper mapper = new();
[Fact]
public void CreateOnDataChange_ConvertsValueTimestampQualityAndStatuses()
{
DateTime timestamp = new(2026, 4, 26, 12, 30, 0, DateTimeKind.Utc);
FakeStatus[] statuses =
{
new()
{
success = -1,
category = 0,
detectedBy = 5,
detail = 0,
},
};
MxEvent mxEvent = mapper.CreateOnDataChange(
"session-1",
serverHandle: 12,
itemHandle: 34,
value: 42,
quality: 192,
timestamp: timestamp,
statuses: statuses);
Assert.Equal(MxEventFamily.OnDataChange, mxEvent.Family);
Assert.Equal("session-1", mxEvent.SessionId);
Assert.Equal(12, mxEvent.ServerHandle);
Assert.Equal(34, mxEvent.ItemHandle);
Assert.Equal(42, mxEvent.Value.Int32Value);
Assert.Equal(192, mxEvent.Quality);
Assert.Equal(timestamp, mxEvent.SourceTimestamp.ToDateTime());
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, mxEvent.BodyCase);
MxStatusProxy status = Assert.Single(mxEvent.Statuses);
Assert.Equal(-1, status.Success);
Assert.Equal(MxStatusCategory.Ok, status.Category);
Assert.Equal(MxStatusSource.RespondingAutomationObject, status.DetectedBy);
}
[Fact]
public void CreateOnWriteCompleteAndOperationComplete_PreservesDistinctFamilies()
{
MxEvent writeComplete = mapper.CreateOnWriteComplete(
"session-1",
serverHandle: 1,
itemHandle: 2,
statuses: Array.Empty<FakeStatus>());
MxEvent operationComplete = mapper.CreateOperationComplete(
"session-1",
serverHandle: 1,
itemHandle: 2,
statuses: Array.Empty<FakeStatus>());
Assert.Equal(MxEventFamily.OnWriteComplete, writeComplete.Family);
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, writeComplete.BodyCase);
Assert.Equal(MxEventFamily.OperationComplete, operationComplete.Family);
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, operationComplete.BodyCase);
}
[Fact]
public void CreateOnBufferedDataChange_PreservesRawDataTypeAndArrayMetadata()
{
DateTime firstTimestamp = new(2026, 4, 26, 13, 0, 0, DateTimeKind.Utc);
DateTime secondTimestamp = new(2026, 4, 26, 13, 1, 0, DateTimeKind.Utc);
MxEvent mxEvent = mapper.CreateOnBufferedDataChange(
"session-1",
serverHandle: 10,
itemHandle: 20,
rawDataType: 2,
value: new[] { 7, 8 },
quality: new[] { 192, 0 },
timestamp: new[] { firstTimestamp, secondTimestamp },
statuses: null);
Assert.Equal(MxEventFamily.OnBufferedDataChange, mxEvent.Family);
Assert.Equal(MxDataType.Integer, mxEvent.OnBufferedDataChange.DataType);
Assert.Equal(2, mxEvent.OnBufferedDataChange.RawDataType);
Assert.Equal(MxDataType.Integer, mxEvent.Value.ArrayValue.ElementDataType);
Assert.Equal(new[] { 7, 8 }, mxEvent.Value.ArrayValue.Int32Values.Values);
Assert.Equal(new[] { 192, 0 }, mxEvent.OnBufferedDataChange.QualityValues.Int32Values.Values);
Assert.Equal(2, mxEvent.OnBufferedDataChange.TimestampValues.TimestampValues.Values.Count);
}
[Theory]
[InlineData(-1, MxDataType.Unknown)]
[InlineData(0, MxDataType.NoData)]
[InlineData(1, MxDataType.Boolean)]
[InlineData(2, MxDataType.Integer)]
[InlineData(6, MxDataType.Time)]
[InlineData(15, MxDataType.InternationalizedString)]
[InlineData(999, MxDataType.Unknown)]
public void MapMxDataType_MapsInstalledMxAccessValues(
int rawDataType,
MxDataType expectedDataType)
{
Assert.Equal(expectedDataType, MxAccessEventMapper.MapMxDataType(rawDataType));
}
private sealed class FakeStatus
{
public int success;
public int category;
public int detectedBy;
public int detail;
}
}
@@ -0,0 +1,106 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessEventQueueTests
{
[Fact]
public void Enqueue_AssignsMonotonicWorkerSequencesAndPreservesOrder()
{
MxAccessEventQueue queue = new(capacity: 4);
WorkerEvent first = queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
WorkerEvent second = queue.Enqueue(CreateEvent(MxEventFamily.OnWriteComplete, itemHandle: 11));
Assert.Equal(1UL, first.Event.WorkerSequence);
Assert.Equal(2UL, second.Event.WorkerSequence);
Assert.NotNull(first.Event.WorkerTimestamp);
Assert.Equal(2, queue.Count);
Assert.Equal(2UL, queue.LastEventSequence);
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedFirst));
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedSecond));
Assert.Equal(10, dequeuedFirst?.Event.ItemHandle);
Assert.Equal(11, dequeuedSecond?.Event.ItemHandle);
Assert.False(queue.TryDequeue(out _));
}
[Fact]
public void Drain_RemovesAtMostRequestedEvents()
{
MxAccessEventQueue queue = new(capacity: 4);
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11));
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12));
IReadOnlyList<WorkerEvent> drained = queue.Drain(maxEvents: 2);
Assert.Equal(2, drained.Count);
Assert.Equal(10, drained[0].Event.ItemHandle);
Assert.Equal(11, drained[1].Event.ItemHandle);
Assert.Equal(1, queue.Count);
}
[Fact]
public void Enqueue_WhenCapacityIsExceeded_RecordsOverflowFaultAndRejectsNewEvents()
{
MxAccessEventQueue queue = new(capacity: 1);
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
MxAccessEventQueueOverflowException overflow = Assert.Throws<MxAccessEventQueueOverflowException>(
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11)));
Assert.Equal(1, overflow.Capacity);
Assert.True(queue.IsFaulted);
Assert.Equal(WorkerFaultCategory.QueueOverflow, queue.Fault?.Category);
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, queue.Fault?.ProtocolStatus.Code);
Assert.Throws<InvalidOperationException>(
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12)));
}
[Fact]
public void RecordFault_KeepsFirstFault()
{
MxAccessEventQueue queue = new(capacity: 1);
queue.RecordFault(new WorkerFault
{
Category = WorkerFaultCategory.MxaccessEventConversionFailed,
});
queue.RecordFault(new WorkerFault
{
Category = WorkerFaultCategory.QueueOverflow,
});
Assert.True(queue.IsFaulted);
Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, queue.Fault?.Category);
}
private static MxEvent CreateEvent(
MxEventFamily family,
int itemHandle)
{
MxEvent mxEvent = new()
{
Family = family,
SessionId = "session-1",
ServerHandle = 1,
ItemHandle = itemHandle,
};
switch (family)
{
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
default:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
}
return mxEvent;
}
}
@@ -18,7 +18,7 @@ public sealed class MxAccessStaSessionTests
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
WorkerReady ready = await session.StartAsync(workerProcessId: 1234);
WorkerReady ready = await session.StartAsync("session-1", workerProcessId: 1234);
Assert.Equal(1234, ready.WorkerProcessId);
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
@@ -28,6 +28,7 @@ public sealed class MxAccessStaSessionTests
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
Assert.Equal("session-1", eventSink.SessionId);
}
[Fact]
@@ -107,10 +108,15 @@ public sealed class MxAccessStaSessionTests
public int? DetachThreadId { get; private set; }
public void Attach(object mxAccessComObject)
public string? SessionId { get; private set; }
public void Attach(
object mxAccessComObject,
string sessionId)
{
AttachedObject = mxAccessComObject;
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
SessionId = sessionId;
}
public void Detach()