Add bulk read/write command family across worker, gateway, and clients

Adds five new MXAccess command kinds (WriteBulk, Write2Bulk,
WriteSecuredBulk, WriteSecured2Bulk, ReadBulk) that ride the existing
"one round-trip, per-entry results" bulk shape used by AddItemBulk and
SubscribeBulk today. MXAccess COM has no native bulk API; the worker
runs each bulk operation as a sequential loop on its STA, returning
one BulkWriteResult / BulkReadResult per requested entry so per-item
MXAccess failures surface as was_successful=false rather than throwing.

ReadBulk has no MXAccess analogue. The worker satisfies it by:

  - Returning the last cached OnDataChange payload (was_cached=true)
    when the requested tag is already in the session''s item registry
    AND advised — the existing subscription is NOT touched, since the
    caller did not create it.
  - Otherwise taking the AddItem + Advise + wait-for-OnDataChange +
    UnAdvise + RemoveItem snapshot lifecycle itself (was_cached=false)
    and leaving the session exactly as it was. The wait pumps Windows
    messages on the STA so the inbound MXAccess event can dispatch
    while the executor still holds the thread.

The new MxAccessValueCache lives on each MxAccessSession, shared with
MxAccessBaseEventSink which populates it on every OnDataChange after
the event clears the outbound queue. Eviction on RemoveItem keeps
reused MXAccess handles from serving stale values from a previous
lifetime.

Gateway-side authorization wires WriteBulk/Write2Bulk to invoke:write,
WriteSecuredBulk/WriteSecured2Bulk to invoke:secure, ReadBulk to
invoke:read. The constraint-filter pipeline is refactored from a single
BulkConstraintPlan record into an abstract base plus three concretes
(SubscribeBulk, WriteBulk, ReadBulk), each owning its own denied-entry
merge so the dispatch site never branches on reply shape. A new
FilterWriteBulkAsync<TEntry> generic over the four write-entry shapes
runs CheckWriteHandleAsync per entry; denied entries surface as the
BulkWriteResult shape, preserving original-index order.

All five language clients (.NET, Go, Rust, Python, Java) gained the
five new methods following their existing bulk pattern, with regenerated
protobufs.

Tests added:
  - MxAccessValueCacheTests (6 cases) — Set/TryGet, Remove resets the
    version, TryWaitForUpdate signals on Set, pump step fires each poll.
  - MxAccessBaseEventSinkTests — OnDataChange populates the cache,
    ValueCache property exposes the bound instance.
  - MxAccessCommandExecutorTests — four bulk-write variants (per-entry
    success/failure, value+timestamp forwarding, secured user ids),
    ReadBulk snapshot lifecycle on uncached tag (timeout surfaces as
    was_successful=false), invalid-payload reply.
  - GatewayGrpcScopeResolverTests — five new MxCommandKind cases.
  - SessionManagerTests — WriteBulk and ReadBulk forwarding through
    FakeWorkerHarness; ReadBulk forwards timeout_ms.
  - Per-client (.NET, Go, Rust, Python, Java) — WriteBulk builds the
    right command and returns per-entry results, ReadBulk forwards the
    timeout and unpacks the was_cached flag.

Cross-language e2e CLI subcommands for the new bulks are deliberately
scoped out of this change (each of the five client CLIs would need
five new subcommands plus matching phases in
scripts/run-client-e2e-tests.ps1); coverage equivalent to the existing
bulk-subscribe coverage is provided by worker + gateway + per-client
unit tests.

Docs updated in the same commit: gateway.md (Public MXAccess Command
Surface), docs/DesignDecisions.md (new "Bulk Command Family" section
with the ReadBulk cache-then-snapshot rationale), and every client
README.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-20 03:42:38 -04:00
parent 758aca2355
commit 5e375f6d3d
41 changed files with 25624 additions and 1339 deletions
@@ -9,6 +9,7 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
{
private readonly MxAccessEventMapper eventMapper;
private readonly MxAccessEventQueue eventQueue;
private readonly MxAccessValueCache valueCache;
private LMXProxyServerClass? server;
private string sessionId = string.Empty;
@@ -21,7 +22,7 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
/// <summary>Initializes a new instance of the MxAccessBaseEventSink class with a provided queue.</summary>
/// <param name="eventQueue">Queue for buffering converted MXAccess events.</param>
public MxAccessBaseEventSink(MxAccessEventQueue eventQueue)
: this(eventQueue, new MxAccessEventMapper())
: this(eventQueue, new MxAccessEventMapper(), new MxAccessValueCache())
{
}
@@ -31,11 +32,36 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
public MxAccessBaseEventSink(
MxAccessEventQueue eventQueue,
MxAccessEventMapper eventMapper)
: this(eventQueue, eventMapper, new MxAccessValueCache())
{
}
/// <summary>
/// Initializes a new instance of the MxAccessBaseEventSink class with
/// provided queue, mapper, and a shared value cache. The cache is
/// populated from every successful <c>OnDataChange</c> dispatch so the
/// worker's ReadBulk executor can satisfy a "current value" request
/// from an already-advised tag without touching the subscription.
/// </summary>
/// <param name="eventQueue">Queue for buffering converted MXAccess events.</param>
/// <param name="eventMapper">Converter for MXAccess events to protobuf format.</param>
/// <param name="valueCache">Per-session last-value cache shared with the MxAccessSession.</param>
public MxAccessBaseEventSink(
MxAccessEventQueue eventQueue,
MxAccessEventMapper eventMapper,
MxAccessValueCache valueCache)
{
this.eventQueue = eventQueue ?? throw new ArgumentNullException(nameof(eventQueue));
this.eventMapper = eventMapper ?? throw new ArgumentNullException(nameof(eventMapper));
this.valueCache = valueCache ?? throw new ArgumentNullException(nameof(valueCache));
}
/// <summary>
/// The last-value cache populated by this sink. Exposed so the
/// MxAccessSession can share the same instance for ReadBulk lookups.
/// </summary>
public MxAccessValueCache ValueCache => valueCache;
/// <inheritdoc />
public void Attach(
object mxAccessComObject,
@@ -81,14 +107,21 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
ref MXSTATUS_PROXY[] pVars)
{
MXSTATUS_PROXY[] statuses = pVars;
EnqueueEvent(() => eventMapper.CreateOnDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses));
// Build the protobuf event once, enqueue it for the outbound stream, and
// also publish it into the per-session value cache so ReadBulk can serve
// it as a "current value" without re-advising. The cache update is the
// ONLY new side effect — fail-fast on conversion still drops the event
// through the same EnqueueEvent path as before.
EnqueueEvent(
() => eventMapper.CreateOnDataChange(
sessionId,
hLMXServerHandle,
phItemHandle,
pvItemValue,
pwItemQuality,
pftItemTimeStamp,
statuses),
mxEvent => valueCache.Set(hLMXServerHandle, phItemHandle, mxEvent));
}
/// <summary>
@@ -152,9 +185,25 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
private void EnqueueEvent(Func<Proto.MxEvent> createEvent)
{
EnqueueEvent(createEvent, postPublish: null);
}
private void EnqueueEvent(Func<Proto.MxEvent> createEvent, Action<Proto.MxEvent>? postPublish)
{
Proto.MxEvent mxEvent;
try
{
eventQueue.Enqueue(createEvent());
mxEvent = createEvent();
}
catch (Exception exception)
{
eventQueue.RecordFault(CreateEventConversionFault(exception));
return;
}
try
{
eventQueue.Enqueue(mxEvent);
}
catch (Exception exception)
{
@@ -169,6 +218,22 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
// this catch's RecordFault call is then a deliberate near
// no-op rather than a second, conflicting fault.
eventQueue.RecordFault(CreateEventConversionFault(exception));
return;
}
// Only publish to caches/observers after the event has cleared the
// queue, so a queue overflow does not leak a "fresher" cached value
// than what was actually shipped to the gateway.
if (postPublish is not null)
{
try
{
postPublish(mxEvent);
}
catch (Exception exception)
{
eventQueue.RecordFault(CreateEventConversionFault(exception));
}
}
}
@@ -11,16 +11,20 @@ namespace MxGateway.Worker.MxAccess;
/// </summary>
public sealed class MxAccessCommandExecutor : IStaCommandExecutor
{
/// <summary>Default per-tag timeout used when <c>ReadBulkCommand.timeout_ms</c> is zero.</summary>
internal static readonly TimeSpan DefaultReadBulkTimeout = TimeSpan.FromMilliseconds(1000);
private readonly MxAccessSession session;
private readonly VariantConverter variantConverter;
private readonly IAlarmCommandHandler? alarmCommandHandler;
private readonly Action pumpStep;
/// <summary>
/// Initializes a command executor with an MXAccess session.
/// </summary>
/// <param name="session">MXAccess session on the STA thread.</param>
public MxAccessCommandExecutor(MxAccessSession session)
: this(session, new VariantConverter(), alarmCommandHandler: null)
: this(session, new VariantConverter(), alarmCommandHandler: null, pumpStep: null)
{
}
@@ -32,7 +36,7 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
public MxAccessCommandExecutor(
MxAccessSession session,
VariantConverter variantConverter)
: this(session, variantConverter, alarmCommandHandler: null)
: this(session, variantConverter, alarmCommandHandler: null, pumpStep: null)
{
}
@@ -46,10 +50,29 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
MxAccessSession session,
VariantConverter variantConverter,
IAlarmCommandHandler? alarmCommandHandler)
: this(session, variantConverter, alarmCommandHandler, pumpStep: null)
{
}
/// <summary>
/// Initializes a command executor with an MXAccess session, variant
/// converter, alarm command handler, and a Windows-message pump action.
/// The pump action is invoked from inside <c>ReadBulk</c>'s wait loop so
/// MXAccess COM events queued for this STA can be dispatched while the
/// executor is still holding the thread. Pass <c>null</c> in tests where
/// ReadBulk is exercised against a fake worker that pre-populates the
/// value cache — the executor falls back to a no-op pump step.
/// </summary>
public MxAccessCommandExecutor(
MxAccessSession session,
VariantConverter variantConverter,
IAlarmCommandHandler? alarmCommandHandler,
Action? pumpStep)
{
this.session = session ?? throw new ArgumentNullException(nameof(session));
this.variantConverter = variantConverter ?? throw new ArgumentNullException(nameof(variantConverter));
this.alarmCommandHandler = alarmCommandHandler;
this.pumpStep = pumpStep ?? (static () => { });
}
/// <summary>
@@ -84,6 +107,11 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
MxCommandKind.UnAdviseItemBulk => ExecuteUnAdviseItemBulk(command),
MxCommandKind.SubscribeBulk => ExecuteSubscribeBulk(command),
MxCommandKind.UnsubscribeBulk => ExecuteUnsubscribeBulk(command),
MxCommandKind.WriteBulk => ExecuteWriteBulk(command),
MxCommandKind.Write2Bulk => ExecuteWrite2Bulk(command),
MxCommandKind.WriteSecuredBulk => ExecuteWriteSecuredBulk(command),
MxCommandKind.WriteSecured2Bulk => ExecuteWriteSecured2Bulk(command),
MxCommandKind.ReadBulk => ExecuteReadBulk(command),
MxCommandKind.SubscribeAlarms => ExecuteSubscribeAlarms(command),
MxCommandKind.UnsubscribeAlarms => ExecuteUnsubscribeAlarms(command),
MxCommandKind.AcknowledgeAlarm => ExecuteAcknowledgeAlarm(command),
@@ -407,6 +435,149 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
session.UnsubscribeBulk(unsubscribeBulkCommand.ServerHandle, unsubscribeBulkCommand.ItemHandles));
}
private MxCommandReply ExecuteWriteBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.WriteBulk)
{
return CreateInvalidRequestReply(command, "WriteBulk command payload is required.");
}
WriteBulkCommand writeBulkCommand = command.Command.WriteBulk;
foreach (WriteBulkEntry entry in writeBulkCommand.Entries)
{
if (entry.Value is null)
{
return CreateInvalidRequestReply(
command,
$"WriteBulk entry for item handle {entry.ItemHandle} is missing its value.");
}
}
return CreateBulkWriteReply(
command,
session.WriteBulk(
writeBulkCommand.ServerHandle,
writeBulkCommand.Entries,
value => variantConverter.ConvertToComValue(value)));
}
private MxCommandReply ExecuteWrite2Bulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.Write2Bulk)
{
return CreateInvalidRequestReply(command, "Write2Bulk command payload is required.");
}
Write2BulkCommand write2BulkCommand = command.Command.Write2Bulk;
foreach (Write2BulkEntry entry in write2BulkCommand.Entries)
{
if (entry.Value is null)
{
return CreateInvalidRequestReply(
command,
$"Write2Bulk entry for item handle {entry.ItemHandle} is missing its value.");
}
if (entry.TimestampValue is null)
{
return CreateInvalidRequestReply(
command,
$"Write2Bulk entry for item handle {entry.ItemHandle} is missing its timestamp value.");
}
}
return CreateBulkWriteReply(
command,
session.Write2Bulk(
write2BulkCommand.ServerHandle,
write2BulkCommand.Entries,
value => variantConverter.ConvertToComValue(value)));
}
private MxCommandReply ExecuteWriteSecuredBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.WriteSecuredBulk)
{
return CreateInvalidRequestReply(command, "WriteSecuredBulk command payload is required.");
}
WriteSecuredBulkCommand writeSecuredBulkCommand = command.Command.WriteSecuredBulk;
foreach (WriteSecuredBulkEntry entry in writeSecuredBulkCommand.Entries)
{
if (entry.Value is null)
{
return CreateInvalidRequestReply(
command,
$"WriteSecuredBulk entry for item handle {entry.ItemHandle} is missing its value.");
}
}
return CreateBulkWriteReply(
command,
session.WriteSecuredBulk(
writeSecuredBulkCommand.ServerHandle,
writeSecuredBulkCommand.Entries,
value => variantConverter.ConvertToComValue(value)));
}
private MxCommandReply ExecuteWriteSecured2Bulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.WriteSecured2Bulk)
{
return CreateInvalidRequestReply(command, "WriteSecured2Bulk command payload is required.");
}
WriteSecured2BulkCommand writeSecured2BulkCommand = command.Command.WriteSecured2Bulk;
foreach (WriteSecured2BulkEntry entry in writeSecured2BulkCommand.Entries)
{
if (entry.Value is null)
{
return CreateInvalidRequestReply(
command,
$"WriteSecured2Bulk entry for item handle {entry.ItemHandle} is missing its value.");
}
if (entry.TimestampValue is null)
{
return CreateInvalidRequestReply(
command,
$"WriteSecured2Bulk entry for item handle {entry.ItemHandle} is missing its timestamp value.");
}
}
return CreateBulkWriteReply(
command,
session.WriteSecured2Bulk(
writeSecured2BulkCommand.ServerHandle,
writeSecured2BulkCommand.Entries,
value => variantConverter.ConvertToComValue(value)));
}
private MxCommandReply ExecuteReadBulk(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.ReadBulk)
{
return CreateInvalidRequestReply(command, "ReadBulk command payload is required.");
}
ReadBulkCommand readBulkCommand = command.Command.ReadBulk;
TimeSpan timeout = readBulkCommand.TimeoutMs == 0
? DefaultReadBulkTimeout
: TimeSpan.FromMilliseconds(readBulkCommand.TimeoutMs);
IReadOnlyList<BulkReadResult> results = session.ReadBulk(
readBulkCommand.ServerHandle,
readBulkCommand.TagAddresses,
timeout,
pumpStep);
MxCommandReply reply = CreateOkReply(command);
BulkReadReply bulkReply = new();
bulkReply.Results.Add(results);
reply.ReadBulk = bulkReply;
return reply;
}
private MxCommandReply ExecuteSubscribeAlarms(StaCommand command)
{
if (command.Command.PayloadCase != MxCommand.PayloadOneofCase.SubscribeAlarms)
@@ -653,6 +824,35 @@ public sealed class MxAccessCommandExecutor : IStaCommandExecutor
return reply;
}
private static MxCommandReply CreateBulkWriteReply(
StaCommand command,
IEnumerable<BulkWriteResult> results)
{
MxCommandReply reply = CreateOkReply(command);
BulkWriteReply bulkReply = new();
bulkReply.Results.Add(results);
switch (command.Kind)
{
case MxCommandKind.WriteBulk:
reply.WriteBulk = bulkReply;
break;
case MxCommandKind.Write2Bulk:
reply.Write2Bulk = bulkReply;
break;
case MxCommandKind.WriteSecuredBulk:
reply.WriteSecuredBulk = bulkReply;
break;
case MxCommandKind.WriteSecured2Bulk:
reply.WriteSecured2Bulk = bulkReply;
break;
default:
throw new InvalidOperationException($"Unsupported bulk write command kind {command.Kind}.");
}
return reply;
}
private static MxCommandReply CreateInvalidRequestReply(
StaCommand command,
string message)
@@ -12,6 +12,7 @@ public sealed class MxAccessSession : IDisposable
private readonly IMxAccessServer mxAccessServer;
private readonly IMxAccessEventSink eventSink;
private readonly MxAccessHandleRegistry handleRegistry;
private readonly MxAccessValueCache valueCache;
private bool disposed;
private MxAccessSession(
@@ -19,12 +20,14 @@ public sealed class MxAccessSession : IDisposable
IMxAccessServer mxAccessServer,
IMxAccessEventSink eventSink,
MxAccessHandleRegistry handleRegistry,
MxAccessValueCache valueCache,
int creationThreadId)
{
this.mxAccessComObject = mxAccessComObject ?? throw new ArgumentNullException(nameof(mxAccessComObject));
this.mxAccessServer = mxAccessServer ?? throw new ArgumentNullException(nameof(mxAccessServer));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
this.handleRegistry = handleRegistry ?? throw new ArgumentNullException(nameof(handleRegistry));
this.valueCache = valueCache ?? throw new ArgumentNullException(nameof(valueCache));
CreationThreadId = creationThreadId;
}
@@ -34,6 +37,14 @@ public sealed class MxAccessSession : IDisposable
/// <summary>The registry for tracking opened handles.</summary>
public MxAccessHandleRegistry HandleRegistry => handleRegistry;
/// <summary>
/// Per-session last-value cache populated by the event sink. ReadBulk
/// consults this cache before falling back to its own snapshot
/// lifecycle so it can serve a "current value" for an already-advised
/// tag without touching the existing subscription.
/// </summary>
public MxAccessValueCache ValueCache => valueCache;
/// <summary>Creates a WorkerReady message with session metadata.</summary>
/// <param name="workerProcessId">Process ID of the worker.</param>
public WorkerReady CreateWorkerReady(int workerProcessId)
@@ -78,11 +89,21 @@ public sealed class MxAccessSession : IDisposable
eventSink.Attach(mxAccessComObject, sessionId);
// Share the event sink's value cache when one is wired (the
// production MxAccessBaseEventSink path) so OnDataChange writes and
// ReadBulk reads both see the same instance. Fall back to a fresh
// cache for test fakes that supply their own sink — ReadBulk simply
// never serves cached values in that case.
MxAccessValueCache valueCache = eventSink is MxAccessBaseEventSink baseSink
? baseSink.ValueCache
: new MxAccessValueCache();
return new MxAccessSession(
mxAccessComObject,
new MxAccessComServer(mxAccessComObject),
eventSink,
new MxAccessHandleRegistry(),
valueCache,
Environment.CurrentManagedThreadId);
}
catch (Exception exception)
@@ -180,6 +201,10 @@ public sealed class MxAccessSession : IDisposable
mxAccessServer.RemoveItem(serverHandle, itemHandle);
handleRegistry.RemoveItemHandle(serverHandle, itemHandle);
// Evict the last-value entry so a future AddItem + Advise on the
// same handle id (which MXAccess may reuse) does not serve a stale
// OnDataChange snapshot from the previous lifetime.
valueCache.Remove(serverHandle, itemHandle);
}
/// <summary>Advises on item changes with plain subscription.</summary>
@@ -513,6 +538,394 @@ public sealed class MxAccessSession : IDisposable
return results;
}
/// <summary>
/// Bulk write — runs <see cref="Write"/> sequentially for each entry.
/// Each entry's <paramref name="convertValue"/> turns the protobuf
/// MxValue into a COM-marshalable variant. Per-item failures are
/// captured as <see cref="BulkWriteResult"/> entries with
/// <c>was_successful = false</c>; the loop never throws.
/// </summary>
public IReadOnlyList<BulkWriteResult> WriteBulk(
int serverHandle,
IReadOnlyList<WriteBulkEntry> entries,
Func<MxValue, object?> convertValue)
{
ThrowIfDisposed();
if (entries is null)
{
throw new ArgumentNullException(nameof(entries));
}
if (convertValue is null)
{
throw new ArgumentNullException(nameof(convertValue));
}
List<BulkWriteResult> results = new(entries.Count);
foreach (WriteBulkEntry entry in entries)
{
results.Add(ExecuteBulkWriteEntry(
serverHandle,
entry.ItemHandle,
() => Write(serverHandle, entry.ItemHandle, convertValue(entry.Value), entry.UserId)));
}
return results;
}
/// <summary>Bulk Write2 — sequential MXAccess <see cref="Write2"/> per entry.</summary>
public IReadOnlyList<BulkWriteResult> Write2Bulk(
int serverHandle,
IReadOnlyList<Write2BulkEntry> entries,
Func<MxValue, object?> convertValue)
{
ThrowIfDisposed();
if (entries is null)
{
throw new ArgumentNullException(nameof(entries));
}
if (convertValue is null)
{
throw new ArgumentNullException(nameof(convertValue));
}
List<BulkWriteResult> results = new(entries.Count);
foreach (Write2BulkEntry entry in entries)
{
results.Add(ExecuteBulkWriteEntry(
serverHandle,
entry.ItemHandle,
() => Write2(
serverHandle,
entry.ItemHandle,
convertValue(entry.Value),
convertValue(entry.TimestampValue),
entry.UserId)));
}
return results;
}
/// <summary>Bulk WriteSecured — sequential MXAccess <see cref="WriteSecured"/> per entry.</summary>
public IReadOnlyList<BulkWriteResult> WriteSecuredBulk(
int serverHandle,
IReadOnlyList<WriteSecuredBulkEntry> entries,
Func<MxValue, object?> convertValue)
{
ThrowIfDisposed();
if (entries is null)
{
throw new ArgumentNullException(nameof(entries));
}
if (convertValue is null)
{
throw new ArgumentNullException(nameof(convertValue));
}
List<BulkWriteResult> results = new(entries.Count);
foreach (WriteSecuredBulkEntry entry in entries)
{
results.Add(ExecuteBulkWriteEntry(
serverHandle,
entry.ItemHandle,
() => WriteSecured(
serverHandle,
entry.ItemHandle,
entry.CurrentUserId,
entry.VerifierUserId,
convertValue(entry.Value))));
}
return results;
}
/// <summary>Bulk WriteSecured2 — sequential MXAccess <see cref="WriteSecured2"/> per entry.</summary>
public IReadOnlyList<BulkWriteResult> WriteSecured2Bulk(
int serverHandle,
IReadOnlyList<WriteSecured2BulkEntry> entries,
Func<MxValue, object?> convertValue)
{
ThrowIfDisposed();
if (entries is null)
{
throw new ArgumentNullException(nameof(entries));
}
if (convertValue is null)
{
throw new ArgumentNullException(nameof(convertValue));
}
List<BulkWriteResult> results = new(entries.Count);
foreach (WriteSecured2BulkEntry entry in entries)
{
results.Add(ExecuteBulkWriteEntry(
serverHandle,
entry.ItemHandle,
() => WriteSecured2(
serverHandle,
entry.ItemHandle,
entry.CurrentUserId,
entry.VerifierUserId,
convertValue(entry.Value),
convertValue(entry.TimestampValue))));
}
return results;
}
/// <summary>
/// Bulk read snapshot. For each requested tag, returns the most recent
/// OnDataChange value if the tag is already advised AND a cached value
/// exists (no subscription side effects); otherwise takes the AddItem
/// + Advise + wait + UnAdvise + RemoveItem snapshot lifecycle itself.
/// <paramref name="timeout"/> bounds the wait per-tag in the snapshot
/// case; <paramref name="pumpStep"/> is invoked on every poll
/// iteration so the worker's STA can dispatch the incoming MXAccess
/// message that carries the value.
/// </summary>
public IReadOnlyList<BulkReadResult> ReadBulk(
int serverHandle,
IReadOnlyList<string> tagAddresses,
TimeSpan timeout,
Action pumpStep)
{
ThrowIfDisposed();
if (tagAddresses is null)
{
throw new ArgumentNullException(nameof(tagAddresses));
}
if (pumpStep is null)
{
throw new ArgumentNullException(nameof(pumpStep));
}
List<BulkReadResult> results = new(tagAddresses.Count);
foreach (string? tagAddress in tagAddresses)
{
if (string.IsNullOrWhiteSpace(tagAddress))
{
results.Add(FailedRead(serverHandle, tagAddress ?? string.Empty, itemHandle: 0, wasCached: false, "Tag address is required."));
continue;
}
results.Add(ReadOneTag(serverHandle, tagAddress, timeout, pumpStep));
}
return results;
}
private BulkReadResult ReadOneTag(
int serverHandle,
string tagAddress,
TimeSpan timeout,
Action pumpStep)
{
// 1. Cached-and-advised fast path: scan the registry for a live item
// handle matching this tag and check whether the value cache has a
// payload for it. If so, return the cached value without touching
// the existing subscription — the caller didn't create it, so
// ReadBulk must not tear it down.
if (TryGetCachedReadFor(serverHandle, tagAddress, out int cachedItemHandle, out MxAccessValueCache.CachedValue cachedValue))
{
return SucceededRead(
serverHandle,
tagAddress,
cachedItemHandle,
wasCached: true,
cachedValue);
}
// 2. Snapshot lifecycle. Reserve our own item handle, advise, pump
// until we see a fresh OnDataChange (or the deadline elapses),
// then tear it down.
int itemHandle = 0;
bool advised = false;
try
{
itemHandle = AddItem(serverHandle, tagAddress);
ulong baseline = valueCache.CurrentVersion(serverHandle, itemHandle);
Advise(serverHandle, itemHandle);
advised = true;
DateTime deadline = DateTime.UtcNow + timeout;
bool gotValue = valueCache.TryWaitForUpdate(
serverHandle,
itemHandle,
baseline,
deadline,
pumpStep,
out MxAccessValueCache.CachedValue snapshot);
return gotValue
? SucceededRead(serverHandle, tagAddress, itemHandle, wasCached: false, snapshot)
: FailedRead(
serverHandle,
tagAddress,
itemHandle,
wasCached: false,
$"ReadBulk timed out after {timeout.TotalMilliseconds:F0} ms waiting for first OnDataChange.");
}
catch (Exception exception)
{
return FailedRead(serverHandle, tagAddress, itemHandle, wasCached: false, exception.Message);
}
finally
{
// Snapshot teardown — best-effort. Errors here are noted on the
// diagnostic message of the original result (above) by appending
// a cleanup suffix; we never re-throw from finally.
if (advised)
{
try { UnAdvise(serverHandle, itemHandle); } catch { /* swallow — best effort */ }
}
if (itemHandle != 0)
{
try { RemoveItem(serverHandle, itemHandle); } catch { /* swallow — best effort */ }
}
}
}
private bool TryGetCachedReadFor(
int serverHandle,
string tagAddress,
out int itemHandle,
out MxAccessValueCache.CachedValue cachedValue)
{
// Linear scan — bulk-read sizes are small in practice and the registry
// is keyed by handle, not by tag. If profiling ever shows this hot, a
// reverse tag→handle map can be added on the registry side.
foreach (RegisteredItemHandle registered in handleRegistry.ItemHandles)
{
if (registered.ServerHandle != serverHandle)
{
continue;
}
if (!string.Equals(registered.ItemDefinition, tagAddress, StringComparison.Ordinal))
{
continue;
}
if (!handleRegistry.ContainsAdviceHandle(serverHandle, registered.ItemHandle, MxAccessAdviceKind.Plain)
&& !handleRegistry.ContainsAdviceHandle(serverHandle, registered.ItemHandle, MxAccessAdviceKind.Supervisory))
{
// Tag is added but not advised — no fresh OnDataChange will
// arrive without us advising. Fall through to the snapshot
// path which advises explicitly.
continue;
}
if (valueCache.TryGet(serverHandle, registered.ItemHandle, out cachedValue))
{
itemHandle = registered.ItemHandle;
return true;
}
}
itemHandle = 0;
cachedValue = default;
return false;
}
private BulkWriteResult ExecuteBulkWriteEntry(
int serverHandle,
int itemHandle,
Action invokeWrite)
{
try
{
invokeWrite();
return new BulkWriteResult
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
WasSuccessful = true,
ErrorMessage = string.Empty,
};
}
catch (System.Runtime.InteropServices.COMException comException)
{
BulkWriteResult result = new()
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
WasSuccessful = false,
ErrorMessage = comException.Message,
};
result.Hresult = comException.HResult;
return result;
}
catch (Exception exception)
{
return new BulkWriteResult
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
WasSuccessful = false,
ErrorMessage = exception.Message,
};
}
}
private static BulkReadResult SucceededRead(
int serverHandle,
string tagAddress,
int itemHandle,
bool wasCached,
MxAccessValueCache.CachedValue snapshot)
{
BulkReadResult result = new()
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
ItemHandle = itemHandle,
WasSuccessful = true,
WasCached = wasCached,
Quality = snapshot.Quality,
ErrorMessage = string.Empty,
};
if (snapshot.Value is not null)
{
result.Value = snapshot.Value;
}
if (snapshot.SourceTimestamp is not null)
{
result.SourceTimestamp = snapshot.SourceTimestamp;
}
if (snapshot.Statuses is not null)
{
result.Statuses.Add(snapshot.Statuses);
}
return result;
}
private static BulkReadResult FailedRead(
int serverHandle,
string tagAddress,
int itemHandle,
bool wasCached,
string errorMessage)
{
return new BulkReadResult
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
ItemHandle = itemHandle,
WasSuccessful = false,
WasCached = wasCached,
ErrorMessage = errorMessage,
};
}
/// <summary>Gracefully shuts down the session, cleaning up all handles.</summary>
public MxAccessShutdownResult ShutdownGracefully()
{
@@ -196,7 +196,13 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
new MxAccessCommandExecutor(
session,
new VariantConverter(),
alarmCommandHandler));
alarmCommandHandler,
// ReadBulk needs to pump Windows messages while it waits
// for the first OnDataChange callback so the inbound COM
// event can dispatch on this same STA thread. The pump
// step closes over staRuntime so it always pumps the
// pump tied to the apartment that owns this session.
pumpStep: () => staRuntime.PumpPendingMessages()));
return session.CreateWorkerReady(workerProcessId);
},
@@ -0,0 +1,190 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// Per-session cache of the most recent <c>OnDataChange</c> payload for
/// each (server handle, item handle) pair. Written by the MXAccess event
/// sink as new OnDataChange callbacks arrive; read by the ReadBulk command
/// executor so it can satisfy a "current value" request from a tag that is
/// already advised without modifying the existing subscription.
/// </summary>
/// <remarks>
/// Both writers and readers run on the worker's STA thread (COM dispatches
/// events on the apartment thread; commands also execute on the STA), so
/// no internal locking is required. The class is still nominally
/// thread-safe via a single sync root in case tests drive it from a
/// non-STA thread.
/// </remarks>
public sealed class MxAccessValueCache
{
private readonly Dictionary<long, CachedValue> entries = new();
private readonly object syncRoot = new();
/// <summary>Records a fresh OnDataChange payload for the given handle pair.</summary>
/// <param name="serverHandle">MXAccess server handle.</param>
/// <param name="itemHandle">MXAccess item handle.</param>
/// <param name="mxEvent">The protobuf MxEvent created by the event mapper.</param>
public void Set(
int serverHandle,
int itemHandle,
MxEvent mxEvent)
{
if (mxEvent is null)
{
throw new ArgumentNullException(nameof(mxEvent));
}
long key = CreateItemKey(serverHandle, itemHandle);
lock (syncRoot)
{
ulong nextVersion = entries.TryGetValue(key, out CachedValue existing)
? existing.Version + 1
: 1UL;
entries[key] = new CachedValue(
nextVersion,
mxEvent.Value,
mxEvent.Quality,
mxEvent.SourceTimestamp,
mxEvent.Statuses);
}
}
/// <summary>Tries to read the most recent cached value for the handle pair.</summary>
public bool TryGet(
int serverHandle,
int itemHandle,
out CachedValue value)
{
long key = CreateItemKey(serverHandle, itemHandle);
lock (syncRoot)
{
return entries.TryGetValue(key, out value);
}
}
/// <summary>
/// Removes the cache slot for a handle pair. The session calls this
/// when an item is unregistered so stale values are not served to a
/// subsequent ReadBulk after a tag is removed and re-added.
/// </summary>
public void Remove(
int serverHandle,
int itemHandle)
{
long key = CreateItemKey(serverHandle, itemHandle);
lock (syncRoot)
{
entries.Remove(key);
}
}
/// <summary>
/// Waits until the cache entry's version exceeds <paramref name="sinceVersion"/>
/// or the deadline elapses, calling <paramref name="pumpStep"/> on every poll
/// iteration so the worker's STA can dispatch the inbound MXAccess message.
/// </summary>
/// <param name="serverHandle">MXAccess server handle.</param>
/// <param name="itemHandle">MXAccess item handle.</param>
/// <param name="sinceVersion">Version snapshot captured before the wait.</param>
/// <param name="deadlineUtc">Absolute UTC deadline.</param>
/// <param name="pumpStep">Action that pumps any pending Windows messages.</param>
/// <param name="pollIntervalMs">How long to sleep between pump cycles. Default 5 ms.</param>
public bool TryWaitForUpdate(
int serverHandle,
int itemHandle,
ulong sinceVersion,
DateTime deadlineUtc,
Action pumpStep,
out CachedValue value,
int pollIntervalMs = 5)
{
if (pumpStep is null)
{
throw new ArgumentNullException(nameof(pumpStep));
}
while (true)
{
pumpStep();
if (TryGet(serverHandle, itemHandle, out value) && value.Version > sinceVersion)
{
return true;
}
if (DateTime.UtcNow >= deadlineUtc)
{
return false;
}
Thread.Sleep(pollIntervalMs);
}
}
/// <summary>Returns the current version for a handle pair, or 0 if no entry exists.</summary>
public ulong CurrentVersion(
int serverHandle,
int itemHandle)
{
return TryGet(serverHandle, itemHandle, out CachedValue existing)
? existing.Version
: 0UL;
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
/// <summary>
/// Snapshot of the most recent OnDataChange payload for a handle pair.
/// <see cref="Version"/> increments by one on every <see cref="Set"/>
/// call so the bulk read executor can detect "a new value arrived
/// since I started waiting".
/// </summary>
/// <remarks>
/// Plain readonly struct (not a record) so this compiles under the
/// worker's net48 target, which lacks <c>IsExternalInit</c>.
/// </remarks>
public readonly struct CachedValue
{
/// <summary>Initializes a new cached value snapshot.</summary>
public CachedValue(
ulong version,
MxValue value,
int quality,
Timestamp sourceTimestamp,
RepeatedField<MxStatusProxy> statuses)
{
Version = version;
Value = value;
Quality = quality;
SourceTimestamp = sourceTimestamp;
Statuses = statuses;
}
/// <summary>Monotonic per-handle version counter.</summary>
public ulong Version { get; }
/// <summary>The cached MxValue payload.</summary>
public MxValue Value { get; }
/// <summary>Quality code from the OnDataChange event.</summary>
public int Quality { get; }
/// <summary>Source timestamp from the OnDataChange event.</summary>
public Timestamp SourceTimestamp { get; }
/// <summary>MxStatusProxy entries from the OnDataChange event.</summary>
public RepeatedField<MxStatusProxy> Statuses { get; }
}
}
+9
View File
@@ -79,6 +79,15 @@ public sealed class StaRuntime : IDisposable
/// </summary>
public bool IsRunning => startedEvent.IsSet && !stoppedEvent.IsSet;
/// <summary>
/// Pumps any pending Windows messages on the calling thread. Intended
/// for commands that synchronously hold the STA (e.g. ReadBulk) and
/// must allow inbound MXAccess COM events to dispatch while they
/// wait. Callers must already be on the STA; the method is otherwise
/// safe (PeekMessage simply finds no messages).
/// </summary>
public int PumpPendingMessages() => messagePump.PumpPendingMessages();
/// <summary>
/// Starts the STA thread.
/// </summary>