From dd455089b43a41654f413868f905035595e394dd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:04:56 -0400 Subject: [PATCH] Implement worker MXAccess event queue --- docs/implementation-plan-mxaccess-worker.md | 2 + docs/mxaccess-worker-instance-design.md | 25 +- .../MxAccess/MxAccessCommandExecutorTests.cs | 4 +- .../MxAccess/MxAccessEventMapperTests.cs | 117 ++++++++++ .../MxAccess/MxAccessEventQueueTests.cs | 106 +++++++++ .../MxAccess/MxAccessStaSessionTests.cs | 10 +- src/MxGateway.Worker/Ipc/WorkerPipeSession.cs | 2 +- .../MxAccess/IMxAccessEventSink.cs | 4 +- .../MxAccess/MxAccessBaseEventSink.cs | 95 +++++++- .../MxAccess/MxAccessEventMapper.cs | 221 ++++++++++++++++++ .../MxAccess/MxAccessEventQueue.cs | 180 ++++++++++++++ .../MxAccessEventQueueOverflowException.cs | 14 ++ .../MxAccess/MxAccessSession.cs | 5 +- .../MxAccess/MxAccessStaSession.cs | 38 ++- 14 files changed, 806 insertions(+), 17 deletions(-) create mode 100644 src/MxGateway.Worker.Tests/MxAccess/MxAccessEventMapperTests.cs create mode 100644 src/MxGateway.Worker.Tests/MxAccess/MxAccessEventQueueTests.cs create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs create mode 100644 src/MxGateway.Worker/MxAccess/MxAccessEventQueueOverflowException.cs diff --git a/docs/implementation-plan-mxaccess-worker.md b/docs/implementation-plan-mxaccess-worker.md index 3f0d9ae..055176c 100644 --- a/docs/implementation-plan-mxaccess-worker.md +++ b/docs/implementation-plan-mxaccess-worker.md @@ -277,6 +277,8 @@ Live tests: Labels: `area:worker`, `type:feature`, `priority:p0` +Status: implemented. + Deliverables: - handlers for `OnDataChange`, diff --git a/docs/mxaccess-worker-instance-design.md b/docs/mxaccess-worker-instance-design.md index 2ec5ec5..12c9322 100644 --- a/docs/mxaccess-worker-instance-design.md +++ b/docs/mxaccess-worker-instance-design.md @@ -348,9 +348,28 @@ Event handling rules: - Enqueue to the outbound event queue. - Return quickly to preserve message pumping. -If event conversion throws, catch it inside the event handler, enqueue a -structured `WorkerFault` or diagnostic event, and keep the worker alive only if -the fault policy allows it. +`MxAccessBaseEventSink` implements the COM connection-point handlers and keeps +the handlers limited to event argument conversion plus enqueue. It uses +`MxAccessEventMapper` to create `MxEvent` DTOs for `OnDataChange`, +`OnWriteComplete`, `OperationComplete`, and `OnBufferedDataChange`. The mapper +converts scalar and array values through `VariantConverter`, converts +`MXSTATUS_PROXY[]` through `MxStatusProxyConverter`, and maps installed +`MxDataType` values to the public protobuf enum while preserving the raw data +type on buffered events. `OperationComplete` is only emitted from the native +`OperationComplete` handler; write completion does not synthesize it. + +`MxAccessEventQueue` is the bounded outbound event queue for one worker +session. It assigns the monotonic `WorkerSequence` and `WorkerTimestamp` when an +event is accepted, preserving the order in which MXAccess handlers enqueue +events. The default capacity is `10000`. When the queue reaches capacity it +records a `WorkerFaultCategory.QueueOverflow` fault and rejects further events. +The event handler catches conversion and enqueue failures, records the first +fault on the queue, and returns to the STA message pump instead of writing to +the pipe. + +If event conversion throws, catch it inside the event handler, record a +structured `WorkerFault`, and keep the worker alive only if the fault policy +allows it. ## Command Queue diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs index af542b8..61bdb7e 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessCommandExecutorTests.cs @@ -842,7 +842,9 @@ public sealed class MxAccessCommandExecutorTests private sealed class NoopEventSink : IMxAccessEventSink { - public void Attach(object mxAccessComObject) + public void Attach( + object mxAccessComObject, + string sessionId) { } diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventMapperTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventMapperTests.cs new file mode 100644 index 0000000..3d1dbb5 --- /dev/null +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventMapperTests.cs @@ -0,0 +1,117 @@ +using System; +using MxGateway.Contracts.Proto; +using MxGateway.Worker.MxAccess; + +namespace MxGateway.Worker.Tests.MxAccess; + +public sealed class MxAccessEventMapperTests +{ + private readonly MxAccessEventMapper mapper = new(); + + [Fact] + public void CreateOnDataChange_ConvertsValueTimestampQualityAndStatuses() + { + DateTime timestamp = new(2026, 4, 26, 12, 30, 0, DateTimeKind.Utc); + FakeStatus[] statuses = + { + new() + { + success = -1, + category = 0, + detectedBy = 5, + detail = 0, + }, + }; + + MxEvent mxEvent = mapper.CreateOnDataChange( + "session-1", + serverHandle: 12, + itemHandle: 34, + value: 42, + quality: 192, + timestamp: timestamp, + statuses: statuses); + + Assert.Equal(MxEventFamily.OnDataChange, mxEvent.Family); + Assert.Equal("session-1", mxEvent.SessionId); + Assert.Equal(12, mxEvent.ServerHandle); + Assert.Equal(34, mxEvent.ItemHandle); + Assert.Equal(42, mxEvent.Value.Int32Value); + Assert.Equal(192, mxEvent.Quality); + Assert.Equal(timestamp, mxEvent.SourceTimestamp.ToDateTime()); + Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, mxEvent.BodyCase); + + MxStatusProxy status = Assert.Single(mxEvent.Statuses); + Assert.Equal(-1, status.Success); + Assert.Equal(MxStatusCategory.Ok, status.Category); + Assert.Equal(MxStatusSource.RespondingAutomationObject, status.DetectedBy); + } + + [Fact] + public void CreateOnWriteCompleteAndOperationComplete_PreservesDistinctFamilies() + { + MxEvent writeComplete = mapper.CreateOnWriteComplete( + "session-1", + serverHandle: 1, + itemHandle: 2, + statuses: Array.Empty()); + MxEvent operationComplete = mapper.CreateOperationComplete( + "session-1", + serverHandle: 1, + itemHandle: 2, + statuses: Array.Empty()); + + Assert.Equal(MxEventFamily.OnWriteComplete, writeComplete.Family); + Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, writeComplete.BodyCase); + Assert.Equal(MxEventFamily.OperationComplete, operationComplete.Family); + Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, operationComplete.BodyCase); + } + + [Fact] + public void CreateOnBufferedDataChange_PreservesRawDataTypeAndArrayMetadata() + { + DateTime firstTimestamp = new(2026, 4, 26, 13, 0, 0, DateTimeKind.Utc); + DateTime secondTimestamp = new(2026, 4, 26, 13, 1, 0, DateTimeKind.Utc); + + MxEvent mxEvent = mapper.CreateOnBufferedDataChange( + "session-1", + serverHandle: 10, + itemHandle: 20, + rawDataType: 2, + value: new[] { 7, 8 }, + quality: new[] { 192, 0 }, + timestamp: new[] { firstTimestamp, secondTimestamp }, + statuses: null); + + Assert.Equal(MxEventFamily.OnBufferedDataChange, mxEvent.Family); + Assert.Equal(MxDataType.Integer, mxEvent.OnBufferedDataChange.DataType); + Assert.Equal(2, mxEvent.OnBufferedDataChange.RawDataType); + Assert.Equal(MxDataType.Integer, mxEvent.Value.ArrayValue.ElementDataType); + Assert.Equal(new[] { 7, 8 }, mxEvent.Value.ArrayValue.Int32Values.Values); + Assert.Equal(new[] { 192, 0 }, mxEvent.OnBufferedDataChange.QualityValues.Int32Values.Values); + Assert.Equal(2, mxEvent.OnBufferedDataChange.TimestampValues.TimestampValues.Values.Count); + } + + [Theory] + [InlineData(-1, MxDataType.Unknown)] + [InlineData(0, MxDataType.NoData)] + [InlineData(1, MxDataType.Boolean)] + [InlineData(2, MxDataType.Integer)] + [InlineData(6, MxDataType.Time)] + [InlineData(15, MxDataType.InternationalizedString)] + [InlineData(999, MxDataType.Unknown)] + public void MapMxDataType_MapsInstalledMxAccessValues( + int rawDataType, + MxDataType expectedDataType) + { + Assert.Equal(expectedDataType, MxAccessEventMapper.MapMxDataType(rawDataType)); + } + + private sealed class FakeStatus + { + public int success; + public int category; + public int detectedBy; + public int detail; + } +} diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventQueueTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventQueueTests.cs new file mode 100644 index 0000000..72b61ee --- /dev/null +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessEventQueueTests.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using MxGateway.Contracts.Proto; +using MxGateway.Worker.MxAccess; + +namespace MxGateway.Worker.Tests.MxAccess; + +public sealed class MxAccessEventQueueTests +{ + [Fact] + public void Enqueue_AssignsMonotonicWorkerSequencesAndPreservesOrder() + { + MxAccessEventQueue queue = new(capacity: 4); + + WorkerEvent first = queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); + WorkerEvent second = queue.Enqueue(CreateEvent(MxEventFamily.OnWriteComplete, itemHandle: 11)); + + Assert.Equal(1UL, first.Event.WorkerSequence); + Assert.Equal(2UL, second.Event.WorkerSequence); + Assert.NotNull(first.Event.WorkerTimestamp); + Assert.Equal(2, queue.Count); + Assert.Equal(2UL, queue.LastEventSequence); + + Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedFirst)); + Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedSecond)); + Assert.Equal(10, dequeuedFirst?.Event.ItemHandle); + Assert.Equal(11, dequeuedSecond?.Event.ItemHandle); + Assert.False(queue.TryDequeue(out _)); + } + + [Fact] + public void Drain_RemovesAtMostRequestedEvents() + { + MxAccessEventQueue queue = new(capacity: 4); + queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); + queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11)); + queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12)); + + IReadOnlyList drained = queue.Drain(maxEvents: 2); + + Assert.Equal(2, drained.Count); + Assert.Equal(10, drained[0].Event.ItemHandle); + Assert.Equal(11, drained[1].Event.ItemHandle); + Assert.Equal(1, queue.Count); + } + + [Fact] + public void Enqueue_WhenCapacityIsExceeded_RecordsOverflowFaultAndRejectsNewEvents() + { + MxAccessEventQueue queue = new(capacity: 1); + queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10)); + + MxAccessEventQueueOverflowException overflow = Assert.Throws( + () => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11))); + + Assert.Equal(1, overflow.Capacity); + Assert.True(queue.IsFaulted); + Assert.Equal(WorkerFaultCategory.QueueOverflow, queue.Fault?.Category); + Assert.Equal(ProtocolStatusCode.WorkerUnavailable, queue.Fault?.ProtocolStatus.Code); + Assert.Throws( + () => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12))); + } + + [Fact] + public void RecordFault_KeepsFirstFault() + { + MxAccessEventQueue queue = new(capacity: 1); + queue.RecordFault(new WorkerFault + { + Category = WorkerFaultCategory.MxaccessEventConversionFailed, + }); + queue.RecordFault(new WorkerFault + { + Category = WorkerFaultCategory.QueueOverflow, + }); + + Assert.True(queue.IsFaulted); + Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, queue.Fault?.Category); + } + + private static MxEvent CreateEvent( + MxEventFamily family, + int itemHandle) + { + MxEvent mxEvent = new() + { + Family = family, + SessionId = "session-1", + ServerHandle = 1, + ItemHandle = itemHandle, + }; + + switch (family) + { + case MxEventFamily.OnWriteComplete: + mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); + break; + + default: + mxEvent.OnDataChange = new OnDataChangeEvent(); + break; + } + + return mxEvent; + } +} diff --git a/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs b/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs index 67783a3..797e611 100644 --- a/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs +++ b/src/MxGateway.Worker.Tests/MxAccess/MxAccessStaSessionTests.cs @@ -18,7 +18,7 @@ public sealed class MxAccessStaSessionTests using StaRuntime runtime = CreateRuntime(); using MxAccessStaSession session = new(runtime, factory, eventSink); - WorkerReady ready = await session.StartAsync(workerProcessId: 1234); + WorkerReady ready = await session.StartAsync("session-1", workerProcessId: 1234); Assert.Equal(1234, ready.WorkerProcessId); Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid); @@ -28,6 +28,7 @@ public sealed class MxAccessStaSessionTests Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId); Assert.Equal(ApartmentState.STA, factory.CreateApartmentState); Assert.Same(factory.CreatedObject, eventSink.AttachedObject); + Assert.Equal("session-1", eventSink.SessionId); } [Fact] @@ -107,10 +108,15 @@ public sealed class MxAccessStaSessionTests public int? DetachThreadId { get; private set; } - public void Attach(object mxAccessComObject) + public string? SessionId { get; private set; } + + public void Attach( + object mxAccessComObject, + string sessionId) { AttachedObject = mxAccessComObject; AttachThreadId = Thread.CurrentThread.ManagedThreadId; + SessionId = sessionId; } public void Detach() diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index 6cb9349..d628faf 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -235,7 +235,7 @@ public sealed class WorkerPipeSession try { return await _mxAccessStaSession - .StartAsync(_processIdProvider(), cancellationToken) + .StartAsync(_options.SessionId, _processIdProvider(), cancellationToken) .ConfigureAwait(false); } catch diff --git a/src/MxGateway.Worker/MxAccess/IMxAccessEventSink.cs b/src/MxGateway.Worker/MxAccess/IMxAccessEventSink.cs index 6e36122..dc8e279 100644 --- a/src/MxGateway.Worker/MxAccess/IMxAccessEventSink.cs +++ b/src/MxGateway.Worker/MxAccess/IMxAccessEventSink.cs @@ -2,7 +2,9 @@ namespace MxGateway.Worker.MxAccess; public interface IMxAccessEventSink { - void Attach(object mxAccessComObject); + void Attach( + object mxAccessComObject, + string sessionId); void Detach(); } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs b/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs index 65aa755..ef5dbfc 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs @@ -1,13 +1,39 @@ +using System; using ArchestrA.MxAccess; +using Proto = MxGateway.Contracts.Proto; namespace MxGateway.Worker.MxAccess; public sealed class MxAccessBaseEventSink : IMxAccessEventSink { + private readonly MxAccessEventMapper eventMapper; + private readonly MxAccessEventQueue eventQueue; private LMXProxyServerClass? server; + private string sessionId = string.Empty; - public void Attach(object mxAccessComObject) + public MxAccessBaseEventSink() + : this(new MxAccessEventQueue()) { + } + + public MxAccessBaseEventSink(MxAccessEventQueue eventQueue) + : this(eventQueue, new MxAccessEventMapper()) + { + } + + public MxAccessBaseEventSink( + MxAccessEventQueue eventQueue, + MxAccessEventMapper eventMapper) + { + this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue)); + this.eventMapper = eventMapper ?? throw new ArgumentNullException(nameof(eventMapper)); + } + + public void Attach( + object mxAccessComObject, + string sessionId) + { + this.sessionId = sessionId ?? string.Empty; server = (LMXProxyServerClass)mxAccessComObject; server.OnDataChange += OnDataChange; server.OnWriteComplete += OnWriteComplete; @@ -27,9 +53,10 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink server.OperationComplete -= OperationComplete; server.OnBufferedDataChange -= OnBufferedDataChange; server = null; + sessionId = string.Empty; } - private static void OnDataChange( + private void OnDataChange( int hLMXServerHandle, int phItemHandle, object pvItemValue, @@ -37,23 +64,44 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink object pftItemTimeStamp, ref MXSTATUS_PROXY[] pVars) { + MXSTATUS_PROXY[] statuses = pVars; + EnqueueEvent(() => eventMapper.CreateOnDataChange( + sessionId, + hLMXServerHandle, + phItemHandle, + pvItemValue, + pwItemQuality, + pftItemTimeStamp, + statuses)); } - private static void OnWriteComplete( + private void OnWriteComplete( int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] pVars) { + MXSTATUS_PROXY[] statuses = pVars; + EnqueueEvent(() => eventMapper.CreateOnWriteComplete( + sessionId, + hLMXServerHandle, + phItemHandle, + statuses)); } - private static void OperationComplete( + private void OperationComplete( int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] pVars) { + MXSTATUS_PROXY[] statuses = pVars; + EnqueueEvent(() => eventMapper.CreateOperationComplete( + sessionId, + hLMXServerHandle, + phItemHandle, + statuses)); } - private static void OnBufferedDataChange( + private void OnBufferedDataChange( int hLMXServerHandle, int phItemHandle, MxDataType dtDataType, @@ -62,5 +110,42 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink object pftItemTimeStamp, ref MXSTATUS_PROXY[] pVars) { + MXSTATUS_PROXY[] statuses = pVars; + EnqueueEvent(() => eventMapper.CreateOnBufferedDataChange( + sessionId, + hLMXServerHandle, + phItemHandle, + (int)dtDataType, + pvItemValue, + pwItemQuality, + pftItemTimeStamp, + statuses)); + } + + private void EnqueueEvent(Func createEvent) + { + try + { + eventQueue.Enqueue(createEvent()); + } + catch (Exception exception) + { + eventQueue.RecordFault(CreateEventConversionFault(exception)); + } + } + + private Proto.WorkerFault CreateEventConversionFault(Exception exception) + { + return new Proto.WorkerFault + { + Category = Proto.WorkerFaultCategory.MxaccessEventConversionFailed, + ExceptionType = exception.GetType().FullName ?? string.Empty, + DiagnosticMessage = $"{exception.GetType().FullName}: HRESULT 0x{unchecked((uint)exception.HResult):X8}", + ProtocolStatus = new Proto.ProtocolStatus + { + Code = Proto.ProtocolStatusCode.MxaccessFailure, + Message = "MXAccess event conversion failed.", + }, + }; } } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs b/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs new file mode 100644 index 0000000..0fe0fef --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs @@ -0,0 +1,221 @@ +using System; +using MxGateway.Contracts.Proto; +using MxGateway.Worker.Conversion; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessEventMapper +{ + private readonly VariantConverter variantConverter; + private readonly MxStatusProxyConverter statusProxyConverter; + + public MxAccessEventMapper() + : this(new VariantConverter(), new MxStatusProxyConverter()) + { + } + + public MxAccessEventMapper( + VariantConverter variantConverter, + MxStatusProxyConverter statusProxyConverter) + { + this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter)); + this.statusProxyConverter = statusProxyConverter ?? throw new ArgumentNullException(nameof(statusProxyConverter)); + } + + public MxEvent CreateOnDataChange( + string sessionId, + int serverHandle, + int itemHandle, + object? value, + int quality, + object? timestamp, + Array? statuses) + { + MxEvent mxEvent = CreateBaseEvent( + MxEventFamily.OnDataChange, + sessionId, + serverHandle, + itemHandle, + statuses); + mxEvent.Value = variantConverter.Convert(value); + mxEvent.Quality = quality; + ApplySourceTimestamp(mxEvent, timestamp); + mxEvent.OnDataChange = new OnDataChangeEvent(); + + return mxEvent; + } + + public MxEvent CreateOnWriteComplete( + string sessionId, + int serverHandle, + int itemHandle, + Array? statuses) + { + MxEvent mxEvent = CreateBaseEvent( + MxEventFamily.OnWriteComplete, + sessionId, + serverHandle, + itemHandle, + statuses); + mxEvent.OnWriteComplete = new OnWriteCompleteEvent(); + + return mxEvent; + } + + public MxEvent CreateOperationComplete( + string sessionId, + int serverHandle, + int itemHandle, + Array? statuses) + { + MxEvent mxEvent = CreateBaseEvent( + MxEventFamily.OperationComplete, + sessionId, + serverHandle, + itemHandle, + statuses); + mxEvent.OperationComplete = new OperationCompleteEvent(); + + return mxEvent; + } + + public MxEvent CreateOnBufferedDataChange( + string sessionId, + int serverHandle, + int itemHandle, + int rawDataType, + object? value, + object? quality, + object? timestamp, + Array? statuses) + { + MxDataType dataType = MapMxDataType(rawDataType); + MxEvent mxEvent = CreateBaseEvent( + MxEventFamily.OnBufferedDataChange, + sessionId, + serverHandle, + itemHandle, + statuses); + mxEvent.Value = variantConverter.Convert(value, dataType); + mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent + { + DataType = dataType, + RawDataType = rawDataType, + QualityValues = ConvertBufferedArray(quality, MxDataType.Integer), + TimestampValues = ConvertBufferedArray(timestamp, MxDataType.Time), + }; + + return mxEvent; + } + + public static MxDataType MapMxDataType(int rawDataType) + { + return rawDataType switch + { + -1 => MxDataType.Unknown, + 0 => MxDataType.NoData, + 1 => MxDataType.Boolean, + 2 => MxDataType.Integer, + 3 => MxDataType.Float, + 4 => MxDataType.Double, + 5 => MxDataType.String, + 6 => MxDataType.Time, + 7 => MxDataType.ElapsedTime, + 8 => MxDataType.ReferenceType, + 9 => MxDataType.StatusType, + 10 => MxDataType.Enum, + 11 => MxDataType.SecurityClassificationEnum, + 12 => MxDataType.DataQualityType, + 13 => MxDataType.QualifiedEnum, + 14 => MxDataType.QualifiedStruct, + 15 => MxDataType.InternationalizedString, + 16 => MxDataType.BigString, + 17 => MxDataType.End, + _ => MxDataType.Unknown, + }; + } + + private MxEvent CreateBaseEvent( + MxEventFamily family, + string sessionId, + int serverHandle, + int itemHandle, + Array? statuses) + { + MxEvent mxEvent = new() + { + Family = family, + SessionId = sessionId ?? string.Empty, + ServerHandle = serverHandle, + ItemHandle = itemHandle, + }; + mxEvent.Statuses.Add(statusProxyConverter.ConvertMany(statuses)); + + return mxEvent; + } + + private void ApplySourceTimestamp( + MxEvent mxEvent, + object? timestamp) + { + MxValue convertedTimestamp = variantConverter.Convert(timestamp, MxDataType.Time); + if (convertedTimestamp.KindCase == MxValue.KindOneofCase.TimestampValue) + { + mxEvent.SourceTimestamp = convertedTimestamp.TimestampValue; + return; + } + + if (!string.IsNullOrWhiteSpace(convertedTimestamp.RawDiagnostic)) + { + mxEvent.RawStatus = string.IsNullOrWhiteSpace(mxEvent.RawStatus) + ? convertedTimestamp.RawDiagnostic + : $"{mxEvent.RawStatus}; {convertedTimestamp.RawDiagnostic}"; + } + } + + private MxArray ConvertBufferedArray( + object? value, + MxDataType expectedElementDataType) + { + if (value is Array array) + { + return variantConverter.ConvertArray(array, expectedElementDataType); + } + + MxValue converted = variantConverter.Convert(value, expectedElementDataType); + if (converted.KindCase == MxValue.KindOneofCase.ArrayValue) + { + return converted.ArrayValue; + } + + MxArray mxArray = new() + { + ElementDataType = converted.DataType, + VariantType = converted.VariantType, + RawElementDataType = converted.RawDataType, + RawDiagnostic = string.IsNullOrWhiteSpace(converted.RawDiagnostic) + ? "Buffered MXAccess event argument was not a SAFEARRAY." + : converted.RawDiagnostic, + }; + + switch (converted.KindCase) + { + case MxValue.KindOneofCase.Int32Value: + mxArray.Int32Values = new Int32Array(); + mxArray.Int32Values.Values.Add(converted.Int32Value); + break; + + case MxValue.KindOneofCase.Int64Value: + mxArray.Int64Values = new Int64Array(); + mxArray.Int64Values.Values.Add(converted.Int64Value); + break; + + case MxValue.KindOneofCase.TimestampValue: + mxArray.TimestampValues = new TimestampArray(); + mxArray.TimestampValues.Values.Add(converted.TimestampValue); + break; + } + + return mxArray; + } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs b/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs new file mode 100644 index 0000000..6482a76 --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs @@ -0,0 +1,180 @@ +using System; +using System.Collections.Generic; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessEventQueue +{ + public const int DefaultCapacity = 10000; + + private readonly int capacity; + private readonly Queue events; + private readonly object syncRoot = new(); + private ulong lastEventSequence; + private WorkerFault? fault; + + public MxAccessEventQueue() + : this(DefaultCapacity) + { + } + + public MxAccessEventQueue(int capacity) + { + if (capacity <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(capacity), + "MXAccess event queue capacity must be greater than zero."); + } + + this.capacity = capacity; + events = new Queue(capacity); + } + + public int Capacity => capacity; + + public int Count + { + get + { + lock (syncRoot) + { + return events.Count; + } + } + } + + public ulong LastEventSequence + { + get + { + lock (syncRoot) + { + return lastEventSequence; + } + } + } + + public bool IsFaulted + { + get + { + lock (syncRoot) + { + return fault is not null; + } + } + } + + public WorkerFault? Fault + { + get + { + lock (syncRoot) + { + return fault?.Clone(); + } + } + } + + public WorkerEvent Enqueue(MxEvent mxEvent) + { + if (mxEvent is null) + { + throw new ArgumentNullException(nameof(mxEvent)); + } + + lock (syncRoot) + { + if (fault is not null) + { + throw new InvalidOperationException("MXAccess outbound event queue is faulted."); + } + + if (events.Count >= capacity) + { + fault = CreateOverflowFault(); + throw new MxAccessEventQueueOverflowException(capacity); + } + + MxEvent queuedEvent = mxEvent.Clone(); + queuedEvent.WorkerSequence = ++lastEventSequence; + queuedEvent.WorkerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow); + + WorkerEvent workerEvent = new() + { + Event = queuedEvent, + }; + events.Enqueue(workerEvent); + + return workerEvent.Clone(); + } + } + + public bool TryDequeue(out WorkerEvent? workerEvent) + { + lock (syncRoot) + { + if (events.Count == 0) + { + workerEvent = null; + return false; + } + + workerEvent = events.Dequeue().Clone(); + return true; + } + } + + public IReadOnlyList Drain(uint maxEvents) + { + lock (syncRoot) + { + int drainCount = maxEvents == 0 + ? events.Count + : Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue))); + if (drainCount == 0) + { + return Array.Empty(); + } + + List drained = new(drainCount); + for (int index = 0; index < drainCount; index++) + { + drained.Add(events.Dequeue().Clone()); + } + + return drained; + } + } + + public void RecordFault(WorkerFault workerFault) + { + if (workerFault is null) + { + throw new ArgumentNullException(nameof(workerFault)); + } + + lock (syncRoot) + { + fault ??= workerFault.Clone(); + } + } + + private WorkerFault CreateOverflowFault() + { + string message = $"MXAccess outbound event queue reached capacity {capacity}."; + return new WorkerFault + { + Category = WorkerFaultCategory.QueueOverflow, + DiagnosticMessage = message, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = message, + }, + }; + } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessEventQueueOverflowException.cs b/src/MxGateway.Worker/MxAccess/MxAccessEventQueueOverflowException.cs new file mode 100644 index 0000000..e00de4a --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/MxAccessEventQueueOverflowException.cs @@ -0,0 +1,14 @@ +using System; + +namespace MxGateway.Worker.MxAccess; + +public sealed class MxAccessEventQueueOverflowException : Exception +{ + public MxAccessEventQueueOverflowException(int capacity) + : base($"MXAccess outbound event queue reached its configured capacity of {capacity}.") + { + Capacity = capacity; + } + + public int Capacity { get; } +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs index b4bc984..b96bd6f 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs @@ -44,7 +44,8 @@ public sealed class MxAccessSession : IDisposable public static MxAccessSession Create( IMxAccessComObjectFactory factory, - IMxAccessEventSink eventSink) + IMxAccessEventSink eventSink, + string sessionId) { if (factory is null) { @@ -66,7 +67,7 @@ public sealed class MxAccessSession : IDisposable throw new InvalidOperationException("MXAccess COM factory returned null."); } - eventSink.Attach(mxAccessComObject); + eventSink.Attach(mxAccessComObject, sessionId); return new MxAccessSession( mxAccessComObject, diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index 6fc9757..f81857b 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -11,6 +11,7 @@ public sealed class MxAccessStaSession : IDisposable { private readonly IMxAccessComObjectFactory factory; private readonly IMxAccessEventSink eventSink; + private readonly MxAccessEventQueue eventQueue; private readonly StaRuntime staRuntime; private StaCommandDispatcher? commandDispatcher; private MxAccessSession? session; @@ -20,7 +21,7 @@ public sealed class MxAccessStaSession : IDisposable : this( new StaRuntime(), new MxAccessComObjectFactory(), - new MxAccessBaseEventSink()) + new MxAccessEventQueue()) { } @@ -28,13 +29,41 @@ public sealed class MxAccessStaSession : IDisposable StaRuntime staRuntime, IMxAccessComObjectFactory factory, IMxAccessEventSink eventSink) + : this(staRuntime, factory, eventSink, new MxAccessEventQueue()) + { + } + + public MxAccessStaSession( + StaRuntime staRuntime, + IMxAccessComObjectFactory factory, + MxAccessEventQueue eventQueue) + : this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue) + { + } + + public MxAccessStaSession( + StaRuntime staRuntime, + IMxAccessComObjectFactory factory, + IMxAccessEventSink eventSink, + MxAccessEventQueue eventQueue) { this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime)); this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink)); + this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue)); + } + + public MxAccessEventQueue EventQueue => eventQueue; + + public Task StartAsync( + int workerProcessId, + CancellationToken cancellationToken = default) + { + return StartAsync(string.Empty, workerProcessId, cancellationToken); } public Task StartAsync( + string sessionId, int workerProcessId, CancellationToken cancellationToken = default) { @@ -48,7 +77,7 @@ public sealed class MxAccessStaSession : IDisposable throw new InvalidOperationException("MXAccess COM session has already been created."); } - session = MxAccessSession.Create(factory, eventSink); + session = MxAccessSession.Create(factory, eventSink, sessionId); commandDispatcher = new StaCommandDispatcher( staRuntime, new MxAccessCommandExecutor(session)); @@ -68,6 +97,11 @@ public sealed class MxAccessStaSession : IDisposable return commandDispatcher.DispatchAsync(command); } + public IReadOnlyList DrainEvents(uint maxEvents) + { + return eventQueue.Drain(maxEvents); + } + public Task> GetRegisteredServerHandlesAsync( CancellationToken cancellationToken = default) { -- 2.52.0