Merge remote-tracking branch 'origin/main' into agent-3/issue-32-implement-heartbeat-and-watchdog

# Conflicts:
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
#	src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs
This commit is contained in:
Joseph Doherty
2026-04-26 19:16:42 -04:00
70 changed files with 3195 additions and 364 deletions
@@ -72,7 +72,7 @@ public sealed class WorkerPipeSession
try
{
await CompleteStartupHandshakeAsync(
token => _runtimeSession.StartAsync(_processIdProvider(), token),
token => _runtimeSession.StartAsync(_options.SessionId, _processIdProvider(), token),
cancellationToken).ConfigureAwait(false);
await RunMessageLoopAsync(cancellationToken).ConfigureAwait(false);
}
@@ -505,7 +505,7 @@ public sealed class WorkerPipeSession
try
{
return await _runtimeSession
.StartAsync(_processIdProvider(), cancellationToken)
.StartAsync(_options.SessionId, _processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
@@ -2,7 +2,9 @@ namespace MxGateway.Worker.MxAccess;
public interface IMxAccessEventSink
{
void Attach(object mxAccessComObject);
void Attach(
object mxAccessComObject,
string sessionId);
void Detach();
}
@@ -18,4 +18,16 @@ public interface IMxAccessServer
void RemoveItem(
int serverHandle,
int itemHandle);
void Advise(
int serverHandle,
int itemHandle);
void UnAdvise(
int serverHandle,
int itemHandle);
void AdviseSupervisory(
int serverHandle,
int itemHandle);
}
@@ -9,6 +9,7 @@ namespace MxGateway.Worker.MxAccess;
public interface IWorkerRuntimeSession : IDisposable
{
Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default);
@@ -0,0 +1,7 @@
namespace MxGateway.Worker.MxAccess;
public enum MxAccessAdviceKind
{
Plain = 1,
Supervisory = 2,
}
@@ -1,13 +1,39 @@
using System;
using ArchestrA.MxAccess;
using Proto = MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
{
private readonly MxAccessEventMapper eventMapper;
private readonly MxAccessEventQueue eventQueue;
private LMXProxyServerClass? server;
private string sessionId = string.Empty;
public void Attach(object mxAccessComObject)
public MxAccessBaseEventSink()
: this(new MxAccessEventQueue())
{
}
public MxAccessBaseEventSink(MxAccessEventQueue eventQueue)
: this(eventQueue, new MxAccessEventMapper())
{
}
public MxAccessBaseEventSink(
MxAccessEventQueue eventQueue,
MxAccessEventMapper eventMapper)
{
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
this.eventMapper = eventMapper ?? throw new ArgumentNullException(nameof(eventMapper));
}
public void Attach(
object mxAccessComObject,
string sessionId)
{
this.sessionId = sessionId ?? string.Empty;
server = (LMXProxyServerClass)mxAccessComObject;
server.OnDataChange += OnDataChange;
server.OnWriteComplete += OnWriteComplete;
@@ -27,9 +53,10 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
server.OperationComplete -= OperationComplete;
server.OnBufferedDataChange -= OnBufferedDataChange;
server = null;
sessionId = string.Empty;
}
private static void OnDataChange(
private void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
@@ -37,23 +64,44 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses));
}
private static void OnWriteComplete(
private void OnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnWriteComplete(
sessionId,
hLMXServerHandle,
phItemHandle,
statuses));
}
private static void OperationComplete(
private void OperationComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOperationComplete(
sessionId,
hLMXServerHandle,
phItemHandle,
statuses));
}
private static void OnBufferedDataChange(
private void OnBufferedDataChange(
int hLMXServerHandle,
int phItemHandle,
MxDataType dtDataType,
@@ -62,5 +110,42 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnBufferedDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
(int)dtDataType,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses));
}
private void EnqueueEvent(Func<Proto.MxEvent> createEvent)
{
try
{
eventQueue.Enqueue(createEvent());
}
catch (Exception exception)
{
eventQueue.RecordFault(CreateEventConversionFault(exception));
}
}
private Proto.WorkerFault CreateEventConversionFault(Exception exception)
{
return new Proto.WorkerFault
{
Category = Proto.WorkerFaultCategory.MxaccessEventConversionFailed,
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = $"{exception.GetType().FullName}: HRESULT 0x{unchecked((uint)exception.HResult):X8}",
ProtocolStatus = new Proto.ProtocolStatus
{
Code = Proto.ProtocolStatusCode.MxaccessFailure,
Message = "MXAccess event conversion failed.",
},
};
}
}
@@ -73,6 +73,45 @@ public sealed class MxAccessComServer : IMxAccessServer
Invoke(nameof(RemoveItem), serverHandle, itemHandle);
}
public void Advise(
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
{
mxAccessServer.Advise(serverHandle, itemHandle);
return;
}
Invoke(nameof(Advise), serverHandle, itemHandle);
}
public void UnAdvise(
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
{
mxAccessServer.UnAdvise(serverHandle, itemHandle);
return;
}
Invoke(nameof(UnAdvise), serverHandle, itemHandle);
}
public void AdviseSupervisory(
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer4 mxAccessServer)
{
mxAccessServer.AdviseSupervisory(serverHandle, itemHandle);
return;
}
Invoke(nameof(AdviseSupervisory), serverHandle, itemHandle);
}
private object Invoke(
string methodName,
params object[] arguments)
@@ -37,6 +37,9 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
MxCommandKind.AddItem => ExecuteAddItem(command),
MxCommandKind.AddItem2 => ExecuteAddItem2(command),
MxCommandKind.RemoveItem => ExecuteRemoveItem(command),
MxCommandKind.Advise => ExecuteAdvise(command),
MxCommandKind.UnAdvise => ExecuteUnAdvise(command),
MxCommandKind.AdviseSupervisory => ExecuteAdviseSupervisory(command),
_ => CreateInvalidRequestReply(command, $"Unsupported MXAccess command kind {command.Kind}."),
};
}
@@ -130,6 +133,51 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
return CreateOkReply(command);
}
private MxCommandReply ExecuteAdvise(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.Advise)
{
return CreateInvalidRequestReply(command, "Advise command payload is required.");
}
AdviseCommand adviseCommand = command.Command.Advise;
session.Advise(
adviseCommand.ServerHandle,
adviseCommand.ItemHandle);
return CreateOkReply(command);
}
private MxCommandReply ExecuteUnAdvise(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.UnAdvise)
{
return CreateInvalidRequestReply(command, "UnAdvise command payload is required.");
}
UnAdviseCommand unAdviseCommand = command.Command.UnAdvise;
session.UnAdvise(
unAdviseCommand.ServerHandle,
unAdviseCommand.ItemHandle);
return CreateOkReply(command);
}
private MxCommandReply ExecuteAdviseSupervisory(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.AdviseSupervisory)
{
return CreateInvalidRequestReply(command, "AdviseSupervisory command payload is required.");
}
AdviseSupervisoryCommand adviseSupervisoryCommand = command.Command.AdviseSupervisory;
session.AdviseSupervisory(
adviseSupervisoryCommand.ServerHandle,
adviseSupervisoryCommand.ItemHandle);
return CreateOkReply(command);
}
private static MxCommandReply CreateOkReply(StaCommand command)
{
return new MxCommandReply
@@ -0,0 +1,221 @@
using System;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Conversion;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventMapper
{
private readonly VariantConverter variantConverter;
private readonly MxStatusProxyConverter statusProxyConverter;
public MxAccessEventMapper()
: this(new VariantConverter(), new MxStatusProxyConverter())
{
}
public MxAccessEventMapper(
VariantConverter variantConverter,
MxStatusProxyConverter statusProxyConverter)
{
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
this.statusProxyConverter = statusProxyConverter ?? throw new ArgumentNullException(nameof(statusProxyConverter));
}
public MxEvent CreateOnDataChange(
string sessionId,
int serverHandle,
int itemHandle,
object? value,
int quality,
object? timestamp,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnDataChange,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.Value = variantConverter.Convert(value);
mxEvent.Quality = quality;
ApplySourceTimestamp(mxEvent, timestamp);
mxEvent.OnDataChange = new OnDataChangeEvent();
return mxEvent;
}
public MxEvent CreateOnWriteComplete(
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnWriteComplete,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
return mxEvent;
}
public MxEvent CreateOperationComplete(
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OperationComplete,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.OperationComplete = new OperationCompleteEvent();
return mxEvent;
}
public MxEvent CreateOnBufferedDataChange(
string sessionId,
int serverHandle,
int itemHandle,
int rawDataType,
object? value,
object? quality,
object? timestamp,
Array? statuses)
{
MxDataType dataType = MapMxDataType(rawDataType);
MxEvent mxEvent = CreateBaseEvent(
MxEventFamily.OnBufferedDataChange,
sessionId,
serverHandle,
itemHandle,
statuses);
mxEvent.Value = variantConverter.Convert(value, dataType);
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent
{
DataType = dataType,
RawDataType = rawDataType,
QualityValues = ConvertBufferedArray(quality, MxDataType.Integer),
TimestampValues = ConvertBufferedArray(timestamp, MxDataType.Time),
};
return mxEvent;
}
public static MxDataType MapMxDataType(int rawDataType)
{
return rawDataType switch
{
-1 => MxDataType.Unknown,
0 => MxDataType.NoData,
1 => MxDataType.Boolean,
2 => MxDataType.Integer,
3 => MxDataType.Float,
4 => MxDataType.Double,
5 => MxDataType.String,
6 => MxDataType.Time,
7 => MxDataType.ElapsedTime,
8 => MxDataType.ReferenceType,
9 => MxDataType.StatusType,
10 => MxDataType.Enum,
11 => MxDataType.SecurityClassificationEnum,
12 => MxDataType.DataQualityType,
13 => MxDataType.QualifiedEnum,
14 => MxDataType.QualifiedStruct,
15 => MxDataType.InternationalizedString,
16 => MxDataType.BigString,
17 => MxDataType.End,
_ => MxDataType.Unknown,
};
}
private MxEvent CreateBaseEvent(
MxEventFamily family,
string sessionId,
int serverHandle,
int itemHandle,
Array? statuses)
{
MxEvent mxEvent = new()
{
Family = family,
SessionId = sessionId ?? string.Empty,
ServerHandle = serverHandle,
ItemHandle = itemHandle,
};
mxEvent.Statuses.Add(statusProxyConverter.ConvertMany(statuses));
return mxEvent;
}
private void ApplySourceTimestamp(
MxEvent mxEvent,
object? timestamp)
{
MxValue convertedTimestamp = variantConverter.Convert(timestamp, MxDataType.Time);
if (convertedTimestamp.KindCase == MxValue.KindOneofCase.TimestampValue)
{
mxEvent.SourceTimestamp = convertedTimestamp.TimestampValue;
return;
}
if (!string.IsNullOrWhiteSpace(convertedTimestamp.RawDiagnostic))
{
mxEvent.RawStatus = string.IsNullOrWhiteSpace(mxEvent.RawStatus)
? convertedTimestamp.RawDiagnostic
: $"{mxEvent.RawStatus}; {convertedTimestamp.RawDiagnostic}";
}
}
private MxArray ConvertBufferedArray(
object? value,
MxDataType expectedElementDataType)
{
if (value is Array array)
{
return variantConverter.ConvertArray(array, expectedElementDataType);
}
MxValue converted = variantConverter.Convert(value, expectedElementDataType);
if (converted.KindCase == MxValue.KindOneofCase.ArrayValue)
{
return converted.ArrayValue;
}
MxArray mxArray = new()
{
ElementDataType = converted.DataType,
VariantType = converted.VariantType,
RawElementDataType = converted.RawDataType,
RawDiagnostic = string.IsNullOrWhiteSpace(converted.RawDiagnostic)
? "Buffered MXAccess event argument was not a SAFEARRAY."
: converted.RawDiagnostic,
};
switch (converted.KindCase)
{
case MxValue.KindOneofCase.Int32Value:
mxArray.Int32Values = new Int32Array();
mxArray.Int32Values.Values.Add(converted.Int32Value);
break;
case MxValue.KindOneofCase.Int64Value:
mxArray.Int64Values = new Int64Array();
mxArray.Int64Values.Values.Add(converted.Int64Value);
break;
case MxValue.KindOneofCase.TimestampValue:
mxArray.TimestampValues = new TimestampArray();
mxArray.TimestampValues.Values.Add(converted.TimestampValue);
break;
}
return mxArray;
}
}
@@ -0,0 +1,180 @@
using System;
using System.Collections.Generic;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventQueue
{
public const int DefaultCapacity = 10000;
private readonly int capacity;
private readonly Queue<WorkerEvent> events;
private readonly object syncRoot = new();
private ulong lastEventSequence;
private WorkerFault? fault;
public MxAccessEventQueue()
: this(DefaultCapacity)
{
}
public MxAccessEventQueue(int capacity)
{
if (capacity <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(capacity),
"MXAccess event queue capacity must be greater than zero.");
}
this.capacity = capacity;
events = new Queue<WorkerEvent>(capacity);
}
public int Capacity => capacity;
public int Count
{
get
{
lock (syncRoot)
{
return events.Count;
}
}
}
public ulong LastEventSequence
{
get
{
lock (syncRoot)
{
return lastEventSequence;
}
}
}
public bool IsFaulted
{
get
{
lock (syncRoot)
{
return fault is not null;
}
}
}
public WorkerFault? Fault
{
get
{
lock (syncRoot)
{
return fault?.Clone();
}
}
}
public WorkerEvent Enqueue(MxEvent mxEvent)
{
if (mxEvent is null)
{
throw new ArgumentNullException(nameof(mxEvent));
}
lock (syncRoot)
{
if (fault is not null)
{
throw new InvalidOperationException("MXAccess outbound event queue is faulted.");
}
if (events.Count >= capacity)
{
fault = CreateOverflowFault();
throw new MxAccessEventQueueOverflowException(capacity);
}
MxEvent queuedEvent = mxEvent.Clone();
queuedEvent.WorkerSequence = ++lastEventSequence;
queuedEvent.WorkerTimestamp = Timestamp.FromDateTime(DateTime.UtcNow);
WorkerEvent workerEvent = new()
{
Event = queuedEvent,
};
events.Enqueue(workerEvent);
return workerEvent.Clone();
}
}
public bool TryDequeue(out WorkerEvent? workerEvent)
{
lock (syncRoot)
{
if (events.Count == 0)
{
workerEvent = null;
return false;
}
workerEvent = events.Dequeue().Clone();
return true;
}
}
public IReadOnlyList<WorkerEvent> Drain(uint maxEvents)
{
lock (syncRoot)
{
int drainCount = maxEvents == 0
? events.Count
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
if (drainCount == 0)
{
return Array.Empty<WorkerEvent>();
}
List<WorkerEvent> drained = new(drainCount);
for (int index = 0; index < drainCount; index++)
{
drained.Add(events.Dequeue().Clone());
}
return drained;
}
}
public void RecordFault(WorkerFault workerFault)
{
if (workerFault is null)
{
throw new ArgumentNullException(nameof(workerFault));
}
lock (syncRoot)
{
fault ??= workerFault.Clone();
}
}
private WorkerFault CreateOverflowFault()
{
string message = $"MXAccess outbound event queue reached capacity {capacity}.";
return new WorkerFault
{
Category = WorkerFaultCategory.QueueOverflow,
DiagnosticMessage = message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
},
};
}
}
@@ -0,0 +1,14 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessEventQueueOverflowException : Exception
{
public MxAccessEventQueueOverflowException(int capacity)
: base($"MXAccess outbound event queue reached its configured capacity of {capacity}.")
{
Capacity = capacity;
}
public int Capacity { get; }
}
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
@@ -7,6 +8,7 @@ public sealed class MxAccessHandleRegistry
{
private readonly Dictionary<int, RegisteredServerHandle> serverHandles = new();
private readonly Dictionary<long, RegisteredItemHandle> itemHandles = new();
private readonly Dictionary<AdviceHandleKey, RegisteredAdviceHandle> adviceHandles = new();
public IReadOnlyList<RegisteredServerHandle> ServerHandles => serverHandles
.Values
@@ -19,6 +21,13 @@ public sealed class MxAccessHandleRegistry
.ThenBy(handle => handle.ItemHandle)
.ToArray();
public IReadOnlyList<RegisteredAdviceHandle> AdviceHandles => adviceHandles
.Values
.OrderBy(handle => handle.ServerHandle)
.ThenBy(handle => handle.ItemHandle)
.ThenBy(handle => handle.AdviceKind)
.ToArray();
public void RegisterServerHandle(
int serverHandle,
string clientName)
@@ -37,6 +46,14 @@ public sealed class MxAccessHandleRegistry
{
itemHandles.Remove(key);
}
foreach (AdviceHandleKey key in adviceHandles
.Where(pair => pair.Value.ServerHandle == serverHandle)
.Select(pair => pair.Key)
.ToArray())
{
adviceHandles.Remove(key);
}
}
public bool ContainsServerHandle(int serverHandle)
@@ -64,6 +81,7 @@ public sealed class MxAccessHandleRegistry
int itemHandle)
{
itemHandles.Remove(CreateItemKey(serverHandle, itemHandle));
RemoveAdviceHandles(serverHandle, itemHandle);
}
public bool ContainsItemHandle(
@@ -73,10 +91,84 @@ public sealed class MxAccessHandleRegistry
return itemHandles.ContainsKey(CreateItemKey(serverHandle, itemHandle));
}
public void RegisterAdviceHandle(
int serverHandle,
int itemHandle,
MxAccessAdviceKind adviceKind)
{
AdviceHandleKey key = new(serverHandle, itemHandle, adviceKind);
adviceHandles[key] = new RegisteredAdviceHandle(
serverHandle,
itemHandle,
adviceKind);
}
public void RemoveAdviceHandles(
int serverHandle,
int itemHandle)
{
foreach (AdviceHandleKey key in adviceHandles
.Where(pair => pair.Value.ServerHandle == serverHandle && pair.Value.ItemHandle == itemHandle)
.Select(pair => pair.Key)
.ToArray())
{
adviceHandles.Remove(key);
}
}
public bool ContainsAdviceHandle(
int serverHandle,
int itemHandle,
MxAccessAdviceKind adviceKind)
{
return adviceHandles.ContainsKey(new AdviceHandleKey(serverHandle, itemHandle, adviceKind));
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
private readonly struct AdviceHandleKey : IEquatable<AdviceHandleKey>
{
private readonly int serverHandle;
private readonly int itemHandle;
private readonly MxAccessAdviceKind adviceKind;
public AdviceHandleKey(
int serverHandle,
int itemHandle,
MxAccessAdviceKind adviceKind)
{
this.serverHandle = serverHandle;
this.itemHandle = itemHandle;
this.adviceKind = adviceKind;
}
public bool Equals(AdviceHandleKey other)
{
return serverHandle == other.serverHandle
&& itemHandle == other.itemHandle
&& adviceKind == other.adviceKind;
}
public override bool Equals(object? obj)
{
return obj is AdviceHandleKey other && Equals(other);
}
public override int GetHashCode()
{
unchecked
{
int hashCode = serverHandle;
hashCode = (hashCode * 397) ^ itemHandle;
hashCode = (hashCode * 397) ^ (int)adviceKind;
return hashCode;
}
}
}
}
@@ -44,7 +44,8 @@ public sealed class MxAccessSession : IDisposable
public static MxAccessSession Create(
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
IMxAccessEventSink eventSink,
string sessionId)
{
if (factory is null)
{
@@ -66,7 +67,7 @@ public sealed class MxAccessSession : IDisposable
throw new InvalidOperationException("MXAccess COM factory returned null.");
}
eventSink.Attach(mxAccessComObject);
eventSink.Attach(mxAccessComObject, sessionId);
return new MxAccessSession(
mxAccessComObject,
@@ -151,6 +152,42 @@ public sealed class MxAccessSession : IDisposable
handleRegistry.RemoveItemHandle(serverHandle, itemHandle);
}
public void Advise(
int serverHandle,
int itemHandle)
{
ThrowIfDisposed();
mxAccessServer.Advise(serverHandle, itemHandle);
handleRegistry.RegisterAdviceHandle(
serverHandle,
itemHandle,
MxAccessAdviceKind.Plain);
}
public void UnAdvise(
int serverHandle,
int itemHandle)
{
ThrowIfDisposed();
mxAccessServer.UnAdvise(serverHandle, itemHandle);
handleRegistry.RemoveAdviceHandles(serverHandle, itemHandle);
}
public void AdviseSupervisory(
int serverHandle,
int itemHandle)
{
ThrowIfDisposed();
mxAccessServer.AdviseSupervisory(serverHandle, itemHandle);
handleRegistry.RegisterAdviceHandle(
serverHandle,
itemHandle,
MxAccessAdviceKind.Supervisory);
}
public void Dispose()
{
if (disposed)
@@ -11,6 +11,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly MxAccessEventQueue eventQueue;
private readonly StaRuntime staRuntime;
private StaCommandDispatcher? commandDispatcher;
private MxAccessSession? session;
@@ -20,7 +21,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessBaseEventSink())
new MxAccessEventQueue())
{
}
@@ -28,13 +29,41 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
: this(staRuntime, factory, eventSink, new MxAccessEventQueue())
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
MxAccessEventQueue eventQueue)
: this(staRuntime, factory, new MxAccessBaseEventSink(eventQueue), eventQueue)
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink,
MxAccessEventQueue eventQueue)
{
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
}
public MxAccessEventQueue EventQueue => eventQueue;
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
return StartAsync(string.Empty, workerProcessId, cancellationToken);
}
public Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
@@ -48,7 +77,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
throw new InvalidOperationException("MXAccess COM session has already been created.");
}
session = MxAccessSession.Create(factory, eventSink);
session = MxAccessSession.Create(factory, eventSink, sessionId);
commandDispatcher = new StaCommandDispatcher(
staRuntime,
new MxAccessCommandExecutor(session));
@@ -82,8 +111,8 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
return new WorkerRuntimeHeartbeatSnapshot(
staRuntime.LastActivityUtc,
pendingCommandCount,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
(uint)eventQueue.Count,
eventQueue.LastEventSequence,
currentCommandCorrelationId);
}
@@ -92,6 +121,11 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
commandDispatcher?.RequestShutdown();
}
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
{
return eventQueue.Drain(maxEvents);
}
public Task<IReadOnlyList<RegisteredServerHandle>> GetRegisteredServerHandlesAsync(
CancellationToken cancellationToken = default)
{
@@ -118,6 +152,19 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
cancellationToken);
}
public Task<IReadOnlyList<RegisteredAdviceHandle>> GetRegisteredAdviceHandlesAsync(
CancellationToken cancellationToken = default)
{
if (session is null)
{
throw new InvalidOperationException("MXAccess COM session has not been started.");
}
return staRuntime.InvokeAsync(
() => session.HandleRegistry.AdviceHandles,
cancellationToken);
}
public void Dispose()
{
if (disposed)
@@ -0,0 +1,20 @@
namespace MxGateway.Worker.MxAccess;
public sealed class RegisteredAdviceHandle
{
public RegisteredAdviceHandle(
int serverHandle,
int itemHandle,
MxAccessAdviceKind adviceKind)
{
ServerHandle = serverHandle;
ItemHandle = itemHandle;
AdviceKind = adviceKind;
}
public int ServerHandle { get; }
public int ItemHandle { get; }
public MxAccessAdviceKind AdviceKind { get; }
}