Implement Event Sink And Event Queue #83
@@ -277,6 +277,8 @@ Live tests:
|
|||||||
|
|
||||||
Labels: `area:worker`, `type:feature`, `priority:p0`
|
Labels: `area:worker`, `type:feature`, `priority:p0`
|
||||||
|
|
||||||
|
Status: implemented.
|
||||||
|
|
||||||
Deliverables:
|
Deliverables:
|
||||||
|
|
||||||
- handlers for `OnDataChange`,
|
- handlers for `OnDataChange`,
|
||||||
|
|||||||
@@ -348,9 +348,28 @@ Event handling rules:
|
|||||||
- Enqueue to the outbound event queue.
|
- Enqueue to the outbound event queue.
|
||||||
- Return quickly to preserve message pumping.
|
- Return quickly to preserve message pumping.
|
||||||
|
|
||||||
If event conversion throws, catch it inside the event handler, enqueue a
|
`MxAccessBaseEventSink` implements the COM connection-point handlers and keeps
|
||||||
structured `WorkerFault` or diagnostic event, and keep the worker alive only if
|
the handlers limited to event argument conversion plus enqueue. It uses
|
||||||
the fault policy allows it.
|
`MxAccessEventMapper` to create `MxEvent` DTOs for `OnDataChange`,
|
||||||
|
`OnWriteComplete`, `OperationComplete`, and `OnBufferedDataChange`. The mapper
|
||||||
|
converts scalar and array values through `VariantConverter`, converts
|
||||||
|
`MXSTATUS_PROXY[]` through `MxStatusProxyConverter`, and maps installed
|
||||||
|
`MxDataType` values to the public protobuf enum while preserving the raw data
|
||||||
|
type on buffered events. `OperationComplete` is only emitted from the native
|
||||||
|
`OperationComplete` handler; write completion does not synthesize it.
|
||||||
|
|
||||||
|
`MxAccessEventQueue` is the bounded outbound event queue for one worker
|
||||||
|
session. It assigns the monotonic `WorkerSequence` and `WorkerTimestamp` when an
|
||||||
|
event is accepted, preserving the order in which MXAccess handlers enqueue
|
||||||
|
events. The default capacity is `10000`. When the queue reaches capacity it
|
||||||
|
records a `WorkerFaultCategory.QueueOverflow` fault and rejects further events.
|
||||||
|
The event handler catches conversion and enqueue failures, records the first
|
||||||
|
fault on the queue, and returns to the STA message pump instead of writing to
|
||||||
|
the pipe.
|
||||||
|
|
||||||
|
If event conversion throws, catch it inside the event handler, record a
|
||||||
|
structured `WorkerFault`, and keep the worker alive only if the fault policy
|
||||||
|
allows it.
|
||||||
|
|
||||||
## Command Queue
|
## Command Queue
|
||||||
|
|
||||||
|
|||||||
@@ -842,7 +842,9 @@ public sealed class MxAccessCommandExecutorTests
|
|||||||
|
|
||||||
private sealed class NoopEventSink : IMxAccessEventSink
|
private sealed class NoopEventSink : IMxAccessEventSink
|
||||||
{
|
{
|
||||||
public void Attach(object mxAccessComObject)
|
public void Attach(
|
||||||
|
object mxAccessComObject,
|
||||||
|
string sessionId)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,117 @@
|
|||||||
|
using System;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.MxAccess;
|
||||||
|
|
||||||
|
public sealed class MxAccessEventMapperTests
|
||||||
|
{
|
||||||
|
private readonly MxAccessEventMapper mapper = new();
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CreateOnDataChange_ConvertsValueTimestampQualityAndStatuses()
|
||||||
|
{
|
||||||
|
DateTime timestamp = new(2026, 4, 26, 12, 30, 0, DateTimeKind.Utc);
|
||||||
|
FakeStatus[] statuses =
|
||||||
|
{
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
success = -1,
|
||||||
|
category = 0,
|
||||||
|
detectedBy = 5,
|
||||||
|
detail = 0,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
MxEvent mxEvent = mapper.CreateOnDataChange(
|
||||||
|
"session-1",
|
||||||
|
serverHandle: 12,
|
||||||
|
itemHandle: 34,
|
||||||
|
value: 42,
|
||||||
|
quality: 192,
|
||||||
|
timestamp: timestamp,
|
||||||
|
statuses: statuses);
|
||||||
|
|
||||||
|
Assert.Equal(MxEventFamily.OnDataChange, mxEvent.Family);
|
||||||
|
Assert.Equal("session-1", mxEvent.SessionId);
|
||||||
|
Assert.Equal(12, mxEvent.ServerHandle);
|
||||||
|
Assert.Equal(34, mxEvent.ItemHandle);
|
||||||
|
Assert.Equal(42, mxEvent.Value.Int32Value);
|
||||||
|
Assert.Equal(192, mxEvent.Quality);
|
||||||
|
Assert.Equal(timestamp, mxEvent.SourceTimestamp.ToDateTime());
|
||||||
|
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, mxEvent.BodyCase);
|
||||||
|
|
||||||
|
MxStatusProxy status = Assert.Single(mxEvent.Statuses);
|
||||||
|
Assert.Equal(-1, status.Success);
|
||||||
|
Assert.Equal(MxStatusCategory.Ok, status.Category);
|
||||||
|
Assert.Equal(MxStatusSource.RespondingAutomationObject, status.DetectedBy);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CreateOnWriteCompleteAndOperationComplete_PreservesDistinctFamilies()
|
||||||
|
{
|
||||||
|
MxEvent writeComplete = mapper.CreateOnWriteComplete(
|
||||||
|
"session-1",
|
||||||
|
serverHandle: 1,
|
||||||
|
itemHandle: 2,
|
||||||
|
statuses: Array.Empty<FakeStatus>());
|
||||||
|
MxEvent operationComplete = mapper.CreateOperationComplete(
|
||||||
|
"session-1",
|
||||||
|
serverHandle: 1,
|
||||||
|
itemHandle: 2,
|
||||||
|
statuses: Array.Empty<FakeStatus>());
|
||||||
|
|
||||||
|
Assert.Equal(MxEventFamily.OnWriteComplete, writeComplete.Family);
|
||||||
|
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, writeComplete.BodyCase);
|
||||||
|
Assert.Equal(MxEventFamily.OperationComplete, operationComplete.Family);
|
||||||
|
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, operationComplete.BodyCase);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CreateOnBufferedDataChange_PreservesRawDataTypeAndArrayMetadata()
|
||||||
|
{
|
||||||
|
DateTime firstTimestamp = new(2026, 4, 26, 13, 0, 0, DateTimeKind.Utc);
|
||||||
|
DateTime secondTimestamp = new(2026, 4, 26, 13, 1, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
MxEvent mxEvent = mapper.CreateOnBufferedDataChange(
|
||||||
|
"session-1",
|
||||||
|
serverHandle: 10,
|
||||||
|
itemHandle: 20,
|
||||||
|
rawDataType: 2,
|
||||||
|
value: new[] { 7, 8 },
|
||||||
|
quality: new[] { 192, 0 },
|
||||||
|
timestamp: new[] { firstTimestamp, secondTimestamp },
|
||||||
|
statuses: null);
|
||||||
|
|
||||||
|
Assert.Equal(MxEventFamily.OnBufferedDataChange, mxEvent.Family);
|
||||||
|
Assert.Equal(MxDataType.Integer, mxEvent.OnBufferedDataChange.DataType);
|
||||||
|
Assert.Equal(2, mxEvent.OnBufferedDataChange.RawDataType);
|
||||||
|
Assert.Equal(MxDataType.Integer, mxEvent.Value.ArrayValue.ElementDataType);
|
||||||
|
Assert.Equal(new[] { 7, 8 }, mxEvent.Value.ArrayValue.Int32Values.Values);
|
||||||
|
Assert.Equal(new[] { 192, 0 }, mxEvent.OnBufferedDataChange.QualityValues.Int32Values.Values);
|
||||||
|
Assert.Equal(2, mxEvent.OnBufferedDataChange.TimestampValues.TimestampValues.Values.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData(-1, MxDataType.Unknown)]
|
||||||
|
[InlineData(0, MxDataType.NoData)]
|
||||||
|
[InlineData(1, MxDataType.Boolean)]
|
||||||
|
[InlineData(2, MxDataType.Integer)]
|
||||||
|
[InlineData(6, MxDataType.Time)]
|
||||||
|
[InlineData(15, MxDataType.InternationalizedString)]
|
||||||
|
[InlineData(999, MxDataType.Unknown)]
|
||||||
|
public void MapMxDataType_MapsInstalledMxAccessValues(
|
||||||
|
int rawDataType,
|
||||||
|
MxDataType expectedDataType)
|
||||||
|
{
|
||||||
|
Assert.Equal(expectedDataType, MxAccessEventMapper.MapMxDataType(rawDataType));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeStatus
|
||||||
|
{
|
||||||
|
public int success;
|
||||||
|
public int category;
|
||||||
|
public int detectedBy;
|
||||||
|
public int detail;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,106 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.MxAccess;
|
||||||
|
|
||||||
|
public sealed class MxAccessEventQueueTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Enqueue_AssignsMonotonicWorkerSequencesAndPreservesOrder()
|
||||||
|
{
|
||||||
|
MxAccessEventQueue queue = new(capacity: 4);
|
||||||
|
|
||||||
|
WorkerEvent first = queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
|
||||||
|
WorkerEvent second = queue.Enqueue(CreateEvent(MxEventFamily.OnWriteComplete, itemHandle: 11));
|
||||||
|
|
||||||
|
Assert.Equal(1UL, first.Event.WorkerSequence);
|
||||||
|
Assert.Equal(2UL, second.Event.WorkerSequence);
|
||||||
|
Assert.NotNull(first.Event.WorkerTimestamp);
|
||||||
|
Assert.Equal(2, queue.Count);
|
||||||
|
Assert.Equal(2UL, queue.LastEventSequence);
|
||||||
|
|
||||||
|
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedFirst));
|
||||||
|
Assert.True(queue.TryDequeue(out WorkerEvent? dequeuedSecond));
|
||||||
|
Assert.Equal(10, dequeuedFirst?.Event.ItemHandle);
|
||||||
|
Assert.Equal(11, dequeuedSecond?.Event.ItemHandle);
|
||||||
|
Assert.False(queue.TryDequeue(out _));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Drain_RemovesAtMostRequestedEvents()
|
||||||
|
{
|
||||||
|
MxAccessEventQueue queue = new(capacity: 4);
|
||||||
|
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
|
||||||
|
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11));
|
||||||
|
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12));
|
||||||
|
|
||||||
|
IReadOnlyList<WorkerEvent> drained = queue.Drain(maxEvents: 2);
|
||||||
|
|
||||||
|
Assert.Equal(2, drained.Count);
|
||||||
|
Assert.Equal(10, drained[0].Event.ItemHandle);
|
||||||
|
Assert.Equal(11, drained[1].Event.ItemHandle);
|
||||||
|
Assert.Equal(1, queue.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Enqueue_WhenCapacityIsExceeded_RecordsOverflowFaultAndRejectsNewEvents()
|
||||||
|
{
|
||||||
|
MxAccessEventQueue queue = new(capacity: 1);
|
||||||
|
queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 10));
|
||||||
|
|
||||||
|
MxAccessEventQueueOverflowException overflow = Assert.Throws<MxAccessEventQueueOverflowException>(
|
||||||
|
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 11)));
|
||||||
|
|
||||||
|
Assert.Equal(1, overflow.Capacity);
|
||||||
|
Assert.True(queue.IsFaulted);
|
||||||
|
Assert.Equal(WorkerFaultCategory.QueueOverflow, queue.Fault?.Category);
|
||||||
|
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, queue.Fault?.ProtocolStatus.Code);
|
||||||
|
Assert.Throws<InvalidOperationException>(
|
||||||
|
() => queue.Enqueue(CreateEvent(MxEventFamily.OnDataChange, itemHandle: 12)));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordFault_KeepsFirstFault()
|
||||||
|
{
|
||||||
|
MxAccessEventQueue queue = new(capacity: 1);
|
||||||
|
queue.RecordFault(new WorkerFault
|
||||||
|
{
|
||||||
|
Category = WorkerFaultCategory.MxaccessEventConversionFailed,
|
||||||
|
});
|
||||||
|
queue.RecordFault(new WorkerFault
|
||||||
|
{
|
||||||
|
Category = WorkerFaultCategory.QueueOverflow,
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.True(queue.IsFaulted);
|
||||||
|
Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, queue.Fault?.Category);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MxEvent CreateEvent(
|
||||||
|
MxEventFamily family,
|
||||||
|
int itemHandle)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = new()
|
||||||
|
{
|
||||||
|
Family = family,
|
||||||
|
SessionId = "session-1",
|
||||||
|
ServerHandle = 1,
|
||||||
|
ItemHandle = itemHandle,
|
||||||
|
};
|
||||||
|
|
||||||
|
switch (family)
|
||||||
|
{
|
||||||
|
case MxEventFamily.OnWriteComplete:
|
||||||
|
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
mxEvent.OnDataChange = new OnDataChangeEvent();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,7 +18,7 @@ public sealed class MxAccessStaSessionTests
|
|||||||
using StaRuntime runtime = CreateRuntime();
|
using StaRuntime runtime = CreateRuntime();
|
||||||
using MxAccessStaSession session = new(runtime, factory, eventSink);
|
using MxAccessStaSession session = new(runtime, factory, eventSink);
|
||||||
|
|
||||||
WorkerReady ready = await session.StartAsync(workerProcessId: 1234);
|
WorkerReady ready = await session.StartAsync("session-1", workerProcessId: 1234);
|
||||||
|
|
||||||
Assert.Equal(1234, ready.WorkerProcessId);
|
Assert.Equal(1234, ready.WorkerProcessId);
|
||||||
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
|
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
|
||||||
@@ -28,6 +28,7 @@ public sealed class MxAccessStaSessionTests
|
|||||||
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
|
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
|
||||||
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
|
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
|
||||||
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
|
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
|
||||||
|
Assert.Equal("session-1", eventSink.SessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -107,10 +108,15 @@ public sealed class MxAccessStaSessionTests
|
|||||||
|
|
||||||
public int? DetachThreadId { get; private set; }
|
public int? DetachThreadId { get; private set; }
|
||||||
|
|
||||||
public void Attach(object mxAccessComObject)
|
public string? SessionId { get; private set; }
|
||||||
|
|
||||||
|
public void Attach(
|
||||||
|
object mxAccessComObject,
|
||||||
|
string sessionId)
|
||||||
{
|
{
|
||||||
AttachedObject = mxAccessComObject;
|
AttachedObject = mxAccessComObject;
|
||||||
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
|
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
|
||||||
|
SessionId = sessionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Detach()
|
public void Detach()
|
||||||
|
|||||||
@@ -235,7 +235,7 @@ public sealed class WorkerPipeSession
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
return await _mxAccessStaSession
|
return await _mxAccessStaSession
|
||||||
.StartAsync(_processIdProvider(), cancellationToken)
|
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
|
||||||
.ConfigureAwait(false);
|
.ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
catch
|
catch
|
||||||
|
|||||||
@@ -2,7 +2,9 @@ namespace MxGateway.Worker.MxAccess;
|
|||||||
|
|
||||||
public interface IMxAccessEventSink
|
public interface IMxAccessEventSink
|
||||||
{
|
{
|
||||||
void Attach(object mxAccessComObject);
|
void Attach(
|
||||||
|
object mxAccessComObject,
|
||||||
|
string sessionId);
|
||||||
|
|
||||||
void Detach();
|
void Detach();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,39 @@
|
|||||||
|
using System;
|
||||||
using ArchestrA.MxAccess;
|
using ArchestrA.MxAccess;
|
||||||
|
using Proto = MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
namespace MxGateway.Worker.MxAccess;
|
namespace MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
|
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
|
||||||
{
|
{
|
||||||
|
private readonly MxAccessEventMapper eventMapper;
|
||||||
|
private readonly MxAccessEventQueue eventQueue;
|
||||||
private LMXProxyServerClass? server;
|
private LMXProxyServerClass? server;
|
||||||
|
private string sessionId = string.Empty;
|
||||||
|
|
||||||
public void Attach(object mxAccessComObject)
|
public MxAccessBaseEventSink()
|
||||||
|
: this(new MxAccessEventQueue())
|
||||||
{
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessBaseEventSink(MxAccessEventQueue eventQueue)
|
||||||
|
: this(eventQueue, new MxAccessEventMapper())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessBaseEventSink(
|
||||||
|
MxAccessEventQueue eventQueue,
|
||||||
|
MxAccessEventMapper eventMapper)
|
||||||
|
{
|
||||||
|
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
|
||||||
|
this.eventMapper = eventMapper ?? throw new ArgumentNullException(nameof(eventMapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Attach(
|
||||||
|
object mxAccessComObject,
|
||||||
|
string sessionId)
|
||||||
|
{
|
||||||
|
this.sessionId = sessionId ?? string.Empty;
|
||||||
server = (LMXProxyServerClass)mxAccessComObject;
|
server = (LMXProxyServerClass)mxAccessComObject;
|
||||||
server.OnDataChange += OnDataChange;
|
server.OnDataChange += OnDataChange;
|
||||||
server.OnWriteComplete += OnWriteComplete;
|
server.OnWriteComplete += OnWriteComplete;
|
||||||
@@ -27,9 +53,10 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
|
|||||||
server.OperationComplete -= OperationComplete;
|
server.OperationComplete -= OperationComplete;
|
||||||
server.OnBufferedDataChange -= OnBufferedDataChange;
|
server.OnBufferedDataChange -= OnBufferedDataChange;
|
||||||
server = null;
|
server = null;
|
||||||
|
sessionId = string.Empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void OnDataChange(
|
private void OnDataChange(
|
||||||
int hLMXServerHandle,
|
int hLMXServerHandle,
|
||||||
int phItemHandle,
|
int phItemHandle,
|
||||||
object pvItemValue,
|
object pvItemValue,
|
||||||
@@ -37,23 +64,44 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
|
|||||||
object pftItemTimeStamp,
|
object pftItemTimeStamp,
|
||||||
ref MXSTATUS_PROXY[] pVars)
|
ref MXSTATUS_PROXY[] pVars)
|
||||||
{
|
{
|
||||||
|
MXSTATUS_PROXY[] statuses = pVars;
|
||||||
|
EnqueueEvent(() => eventMapper.CreateOnDataChange(
|
||||||
|
sessionId,
|
||||||
|
hLMXServerHandle,
|
||||||
|
phItemHandle,
|
||||||
|
pvItemValue,
|
||||||
|
pwItemQuality,
|
||||||
|
pftItemTimeStamp,
|
||||||
|
statuses));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void OnWriteComplete(
|
private void OnWriteComplete(
|
||||||
int hLMXServerHandle,
|
int hLMXServerHandle,
|
||||||
int phItemHandle,
|
int phItemHandle,
|
||||||
ref MXSTATUS_PROXY[] pVars)
|
ref MXSTATUS_PROXY[] pVars)
|
||||||
{
|
{
|
||||||
|
MXSTATUS_PROXY[] statuses = pVars;
|
||||||
|
EnqueueEvent(() => eventMapper.CreateOnWriteComplete(
|
||||||
|
sessionId,
|
||||||
|
hLMXServerHandle,
|
||||||
|
phItemHandle,
|
||||||
|
statuses));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void OperationComplete(
|
private void OperationComplete(
|
||||||
int hLMXServerHandle,
|
int hLMXServerHandle,
|
||||||
int phItemHandle,
|
int phItemHandle,
|
||||||
ref MXSTATUS_PROXY[] pVars)
|
ref MXSTATUS_PROXY[] pVars)
|
||||||
{
|
{
|
||||||
|
MXSTATUS_PROXY[] statuses = pVars;
|
||||||
|
EnqueueEvent(() => eventMapper.CreateOperationComplete(
|
||||||
|
sessionId,
|
||||||
|
hLMXServerHandle,
|
||||||
|
phItemHandle,
|
||||||
|
statuses));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void OnBufferedDataChange(
|
private void OnBufferedDataChange(
|
||||||
int hLMXServerHandle,
|
int hLMXServerHandle,
|
||||||
int phItemHandle,
|
int phItemHandle,
|
||||||
MxDataType dtDataType,
|
MxDataType dtDataType,
|
||||||
@@ -62,5 +110,42 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
|
|||||||
object pftItemTimeStamp,
|
object pftItemTimeStamp,
|
||||||
ref MXSTATUS_PROXY[] pVars)
|
ref MXSTATUS_PROXY[] pVars)
|
||||||
{
|
{
|
||||||
|
MXSTATUS_PROXY[] statuses = pVars;
|
||||||
|
EnqueueEvent(() => eventMapper.CreateOnBufferedDataChange(
|
||||||
|
sessionId,
|
||||||
|
hLMXServerHandle,
|
||||||
|
phItemHandle,
|
||||||
|
(int)dtDataType,
|
||||||
|
pvItemValue,
|
||||||
|
pwItemQuality,
|
||||||
|
pftItemTimeStamp,
|
||||||
|
statuses));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void EnqueueEvent(Func<Proto.MxEvent> createEvent)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
eventQueue.Enqueue(createEvent());
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
eventQueue.RecordFault(CreateEventConversionFault(exception));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Proto.WorkerFault CreateEventConversionFault(Exception exception)
|
||||||
|
{
|
||||||
|
return new Proto.WorkerFault
|
||||||
|
{
|
||||||
|
Category = Proto.WorkerFaultCategory.MxaccessEventConversionFailed,
|
||||||
|
ExceptionType = exception.GetType().FullName ?? string.Empty,
|
||||||
|
DiagnosticMessage = $"{exception.GetType().FullName}: HRESULT 0x{unchecked((uint)exception.HResult):X8}",
|
||||||
|
ProtocolStatus = new Proto.ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = Proto.ProtocolStatusCode.MxaccessFailure,
|
||||||
|
Message = "MXAccess event conversion failed.",
|
||||||
|
},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,221 @@
|
|||||||
|
using System;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.Conversion;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
public sealed class MxAccessEventMapper
|
||||||
|
{
|
||||||
|
private readonly VariantConverter variantConverter;
|
||||||
|
private readonly MxStatusProxyConverter statusProxyConverter;
|
||||||
|
|
||||||
|
public MxAccessEventMapper()
|
||||||
|
: this(new VariantConverter(), new MxStatusProxyConverter())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessEventMapper(
|
||||||
|
VariantConverter variantConverter,
|
||||||
|
MxStatusProxyConverter statusProxyConverter)
|
||||||
|
{
|
||||||
|
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
|
||||||
|
this.statusProxyConverter = statusProxyConverter ?? throw new ArgumentNullException(nameof(statusProxyConverter));
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxEvent CreateOnDataChange(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle,
|
||||||
|
object? value,
|
||||||
|
int quality,
|
||||||
|
object? timestamp,
|
||||||
|
Array? statuses)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = CreateBaseEvent(
|
||||||
|
MxEventFamily.OnDataChange,
|
||||||
|
sessionId,
|
||||||
|
serverHandle,
|
||||||
|
itemHandle,
|
||||||
|
statuses);
|
||||||
|
mxEvent.Value = variantConverter.Convert(value);
|
||||||
|
mxEvent.Quality = quality;
|
||||||
|
ApplySourceTimestamp(mxEvent, timestamp);
|
||||||
|
mxEvent.OnDataChange = new OnDataChangeEvent();
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxEvent CreateOnWriteComplete(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle,
|
||||||
|
Array? statuses)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = CreateBaseEvent(
|
||||||
|
MxEventFamily.OnWriteComplete,
|
||||||
|
sessionId,
|
||||||
|
serverHandle,
|
||||||
|
itemHandle,
|
||||||
|
statuses);
|
||||||
|
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxEvent CreateOperationComplete(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle,
|
||||||
|
Array? statuses)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = CreateBaseEvent(
|
||||||
|
MxEventFamily.OperationComplete,
|
||||||
|
sessionId,
|
||||||
|
serverHandle,
|
||||||
|
itemHandle,
|
||||||
|
statuses);
|
||||||
|
mxEvent.OperationComplete = new OperationCompleteEvent();
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxEvent CreateOnBufferedDataChange(
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle,
|
||||||
|
int rawDataType,
|
||||||
|
object? value,
|
||||||
|
object? quality,
|
||||||
|
object? timestamp,
|
||||||
|
Array? statuses)
|
||||||
|
{
|
||||||
|
MxDataType dataType = MapMxDataType(rawDataType);
|
||||||
|
MxEvent mxEvent = CreateBaseEvent(
|
||||||
|
MxEventFamily.OnBufferedDataChange,
|
||||||
|
sessionId,
|
||||||
|
serverHandle,
|
||||||
|
itemHandle,
|
||||||
|
statuses);
|
||||||
|
mxEvent.Value = variantConverter.Convert(value, dataType);
|
||||||
|
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent
|
||||||
|
{
|
||||||
|
DataType = dataType,
|
||||||
|
RawDataType = rawDataType,
|
||||||
|
QualityValues = ConvertBufferedArray(quality, MxDataType.Integer),
|
||||||
|
TimestampValues = ConvertBufferedArray(timestamp, MxDataType.Time),
|
||||||
|
};
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static MxDataType MapMxDataType(int rawDataType)
|
||||||
|
{
|
||||||
|
return rawDataType switch
|
||||||
|
{
|
||||||
|
-1 => MxDataType.Unknown,
|
||||||
|
0 => MxDataType.NoData,
|
||||||
|
1 => MxDataType.Boolean,
|
||||||
|
2 => MxDataType.Integer,
|
||||||
|
3 => MxDataType.Float,
|
||||||
|
4 => MxDataType.Double,
|
||||||
|
5 => MxDataType.String,
|
||||||
|
6 => MxDataType.Time,
|
||||||
|
7 => MxDataType.ElapsedTime,
|
||||||
|
8 => MxDataType.ReferenceType,
|
||||||
|
9 => MxDataType.StatusType,
|
||||||
|
10 => MxDataType.Enum,
|
||||||
|
11 => MxDataType.SecurityClassificationEnum,
|
||||||
|
12 => MxDataType.DataQualityType,
|
||||||
|
13 => MxDataType.QualifiedEnum,
|
||||||
|
14 => MxDataType.QualifiedStruct,
|
||||||
|
15 => MxDataType.InternationalizedString,
|
||||||
|
16 => MxDataType.BigString,
|
||||||
|
17 => MxDataType.End,
|
||||||
|
_ => MxDataType.Unknown,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private MxEvent CreateBaseEvent(
|
||||||
|
MxEventFamily family,
|
||||||
|
string sessionId,
|
||||||
|
int serverHandle,
|
||||||
|
int itemHandle,
|
||||||
|
Array? statuses)
|
||||||
|
{
|
||||||
|
MxEvent mxEvent = new()
|
||||||
|
{
|
||||||
|
Family = family,
|
||||||
|
SessionId = sessionId ?? string.Empty,
|
||||||
|
ServerHandle = serverHandle,
|
||||||
|
ItemHandle = itemHandle,
|
||||||
|
};
|
||||||
|
mxEvent.Statuses.Add(statusProxyConverter.ConvertMany(statuses));
|
||||||
|
|
||||||
|
return mxEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ApplySourceTimestamp(
|
||||||
|
MxEvent mxEvent,
|
||||||
|
object? timestamp)
|
||||||
|
{
|
||||||
|
MxValue convertedTimestamp = variantConverter.Convert(timestamp, MxDataType.Time);
|
||||||
|
if (convertedTimestamp.KindCase == MxValue.KindOneofCase.TimestampValue)
|
||||||
|
{
|
||||||
|
mxEvent.SourceTimestamp = convertedTimestamp.TimestampValue;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(convertedTimestamp.RawDiagnostic))
|
||||||
|
{
|
||||||
|
mxEvent.RawStatus = string.IsNullOrWhiteSpace(mxEvent.RawStatus)
|
||||||
|
? convertedTimestamp.RawDiagnostic
|
||||||
|
: $"{mxEvent.RawStatus}; {convertedTimestamp.RawDiagnostic}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MxArray ConvertBufferedArray(
|
||||||
|
object? value,
|
||||||
|
MxDataType expectedElementDataType)
|
||||||
|
{
|
||||||
|
if (value is Array array)
|
||||||
|
{
|
||||||
|
return variantConverter.ConvertArray(array, expectedElementDataType);
|
||||||
|
}
|
||||||
|
|
||||||
|
MxValue converted = variantConverter.Convert(value, expectedElementDataType);
|
||||||
|
if (converted.KindCase == MxValue.KindOneofCase.ArrayValue)
|
||||||
|
{
|
||||||
|
return converted.ArrayValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
MxArray mxArray = new()
|
||||||
|
{
|
||||||
|
ElementDataType = converted.DataType,
|
||||||
|
VariantType = converted.VariantType,
|
||||||
|
RawElementDataType = converted.RawDataType,
|
||||||
|
RawDiagnostic = string.IsNullOrWhiteSpace(converted.RawDiagnostic)
|
||||||
|
? "Buffered MXAccess event argument was not a SAFEARRAY."
|
||||||
|
: converted.RawDiagnostic,
|
||||||
|
};
|
||||||
|
|
||||||
|
switch (converted.KindCase)
|
||||||
|
{
|
||||||
|
case MxValue.KindOneofCase.Int32Value:
|
||||||
|
mxArray.Int32Values = new Int32Array();
|
||||||
|
mxArray.Int32Values.Values.Add(converted.Int32Value);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MxValue.KindOneofCase.Int64Value:
|
||||||
|
mxArray.Int64Values = new Int64Array();
|
||||||
|
mxArray.Int64Values.Values.Add(converted.Int64Value);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MxValue.KindOneofCase.TimestampValue:
|
||||||
|
mxArray.TimestampValues = new TimestampArray();
|
||||||
|
mxArray.TimestampValues.Values.Add(converted.TimestampValue);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return mxArray;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,180 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
public sealed class MxAccessEventQueue
|
||||||
|
{
|
||||||
|
public const int DefaultCapacity = 10000;
|
||||||
|
|
||||||
|
private readonly int capacity;
|
||||||
|
private readonly Queue<WorkerEvent> events;
|
||||||
|
private readonly object syncRoot = new();
|
||||||
|
private ulong lastEventSequence;
|
||||||
|
private WorkerFault? fault;
|
||||||
|
|
||||||
|
public MxAccessEventQueue()
|
||||||
|
: this(DefaultCapacity)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessEventQueue(int capacity)
|
||||||
|
{
|
||||||
|
if (capacity <= 0)
|
||||||
|
{
|
||||||
|
throw new ArgumentOutOfRangeException(
|
||||||
|
nameof(capacity),
|
||||||
|
"MXAccess event queue capacity must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.capacity = capacity;
|
||||||
|
events = new Queue<WorkerEvent>(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int Capacity => capacity;
|
||||||
|
|
||||||
|
public int Count
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
return events.Count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ulong LastEventSequence
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
return lastEventSequence;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool IsFaulted
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
return fault is not null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFault? Fault
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
return fault?.Clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerEvent Enqueue(MxEvent mxEvent)
|
||||||
|
{
|
||||||
|
if (mxEvent is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(mxEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
if (fault is not null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("MXAccess outbound event queue is faulted.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (events.Count >= capacity)
|
||||||
|
{
|
||||||
|
fault = CreateOverflowFault();
|
||||||
|
throw new MxAccessEventQueueOverflowException(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
MxEvent queuedEvent = mxEvent.Clone();
|
||||||
|
queuedEvent.WorkerSequence = ++lastEventSequence;
|
||||||
|
queuedEvent.WorkerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow);
|
||||||
|
|
||||||
|
WorkerEvent workerEvent = new()
|
||||||
|
{
|
||||||
|
Event = queuedEvent,
|
||||||
|
};
|
||||||
|
events.Enqueue(workerEvent);
|
||||||
|
|
||||||
|
return workerEvent.Clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool TryDequeue(out WorkerEvent? workerEvent)
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
if (events.Count == 0)
|
||||||
|
{
|
||||||
|
workerEvent = null;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
workerEvent = events.Dequeue().Clone();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<WorkerEvent> Drain(uint maxEvents)
|
||||||
|
{
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
int drainCount = maxEvents == 0
|
||||||
|
? events.Count
|
||||||
|
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
|
||||||
|
if (drainCount == 0)
|
||||||
|
{
|
||||||
|
return Array.Empty<WorkerEvent>();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<WorkerEvent> drained = new(drainCount);
|
||||||
|
for (int index = 0; index < drainCount; index++)
|
||||||
|
{
|
||||||
|
drained.Add(events.Dequeue().Clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
return drained;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RecordFault(WorkerFault workerFault)
|
||||||
|
{
|
||||||
|
if (workerFault is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(workerFault));
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (syncRoot)
|
||||||
|
{
|
||||||
|
fault ??= workerFault.Clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerFault CreateOverflowFault()
|
||||||
|
{
|
||||||
|
string message = $"MXAccess outbound event queue reached capacity {capacity}.";
|
||||||
|
return new WorkerFault
|
||||||
|
{
|
||||||
|
Category = WorkerFaultCategory.QueueOverflow,
|
||||||
|
DiagnosticMessage = message,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.WorkerUnavailable,
|
||||||
|
Message = message,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
public sealed class MxAccessEventQueueOverflowException : Exception
|
||||||
|
{
|
||||||
|
public MxAccessEventQueueOverflowException(int capacity)
|
||||||
|
: base($"MXAccess outbound event queue reached its configured capacity of {capacity}.")
|
||||||
|
{
|
||||||
|
Capacity = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int Capacity { get; }
|
||||||
|
}
|
||||||
@@ -44,7 +44,8 @@ public sealed class MxAccessSession : IDisposable
|
|||||||
|
|
||||||
public static MxAccessSession Create(
|
public static MxAccessSession Create(
|
||||||
IMxAccessComObjectFactory factory,
|
IMxAccessComObjectFactory factory,
|
||||||
IMxAccessEventSink eventSink)
|
IMxAccessEventSink eventSink,
|
||||||
|
string sessionId)
|
||||||
{
|
{
|
||||||
if (factory is null)
|
if (factory is null)
|
||||||
{
|
{
|
||||||
@@ -66,7 +67,7 @@ public sealed class MxAccessSession : IDisposable
|
|||||||
throw new InvalidOperationException("MXAccess COM factory returned null.");
|
throw new InvalidOperationException("MXAccess COM factory returned null.");
|
||||||
}
|
}
|
||||||
|
|
||||||
eventSink.Attach(mxAccessComObject);
|
eventSink.Attach(mxAccessComObject, sessionId);
|
||||||
|
|
||||||
return new MxAccessSession(
|
return new MxAccessSession(
|
||||||
mxAccessComObject,
|
mxAccessComObject,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ public sealed class MxAccessStaSession : IDisposable
|
|||||||
{
|
{
|
||||||
private readonly IMxAccessComObjectFactory factory;
|
private readonly IMxAccessComObjectFactory factory;
|
||||||
private readonly IMxAccessEventSink eventSink;
|
private readonly IMxAccessEventSink eventSink;
|
||||||
|
private readonly MxAccessEventQueue eventQueue;
|
||||||
private readonly StaRuntime staRuntime;
|
private readonly StaRuntime staRuntime;
|
||||||
private StaCommandDispatcher? commandDispatcher;
|
private StaCommandDispatcher? commandDispatcher;
|
||||||
private MxAccessSession? session;
|
private MxAccessSession? session;
|
||||||
@@ -20,7 +21,7 @@ public sealed class MxAccessStaSession : IDisposable
|
|||||||
: this(
|
: this(
|
||||||
new StaRuntime(),
|
new StaRuntime(),
|
||||||
new MxAccessComObjectFactory(),
|
new MxAccessComObjectFactory(),
|
||||||
new MxAccessBaseEventSink())
|
new MxAccessEventQueue())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -28,13 +29,41 @@ public sealed class MxAccessStaSession : IDisposable
|
|||||||
StaRuntime staRuntime,
|
StaRuntime staRuntime,
|
||||||
IMxAccessComObjectFactory factory,
|
IMxAccessComObjectFactory factory,
|
||||||
IMxAccessEventSink eventSink)
|
IMxAccessEventSink eventSink)
|
||||||
|
: this(staRuntime, factory, eventSink, new MxAccessEventQueue())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessStaSession(
|
||||||
|
StaRuntime staRuntime,
|
||||||
|
IMxAccessComObjectFactory factory,
|
||||||
|
MxAccessEventQueue eventQueue)
|
||||||
|
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessStaSession(
|
||||||
|
StaRuntime staRuntime,
|
||||||
|
IMxAccessComObjectFactory factory,
|
||||||
|
IMxAccessEventSink eventSink,
|
||||||
|
MxAccessEventQueue eventQueue)
|
||||||
{
|
{
|
||||||
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
|
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
|
||||||
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
||||||
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
|
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
|
||||||
|
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
|
||||||
|
}
|
||||||
|
|
||||||
|
public MxAccessEventQueue EventQueue => eventQueue;
|
||||||
|
|
||||||
|
public Task<WorkerReady> StartAsync(
|
||||||
|
int workerProcessId,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return StartAsync(string.Empty, workerProcessId, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<WorkerReady> StartAsync(
|
public Task<WorkerReady> StartAsync(
|
||||||
|
string sessionId,
|
||||||
int workerProcessId,
|
int workerProcessId,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
@@ -48,7 +77,7 @@ public sealed class MxAccessStaSession : IDisposable
|
|||||||
throw new InvalidOperationException("MXAccess COM session has already been created.");
|
throw new InvalidOperationException("MXAccess COM session has already been created.");
|
||||||
}
|
}
|
||||||
|
|
||||||
session = MxAccessSession.Create(factory, eventSink);
|
session = MxAccessSession.Create(factory, eventSink, sessionId);
|
||||||
commandDispatcher = new StaCommandDispatcher(
|
commandDispatcher = new StaCommandDispatcher(
|
||||||
staRuntime,
|
staRuntime,
|
||||||
new MxAccessCommandExecutor(session));
|
new MxAccessCommandExecutor(session));
|
||||||
@@ -68,6 +97,11 @@ public sealed class MxAccessStaSession : IDisposable
|
|||||||
return commandDispatcher.DispatchAsync(command);
|
return commandDispatcher.DispatchAsync(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
|
||||||
|
{
|
||||||
|
return eventQueue.Drain(maxEvents);
|
||||||
|
}
|
||||||
|
|
||||||
public Task<IReadOnlyList<RegisteredServerHandle>> GetRegisteredServerHandlesAsync(
|
public Task<IReadOnlyList<RegisteredServerHandle>> GetRegisteredServerHandlesAsync(
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user